Python 3 BrokenPipeError pipe test (last update: 2018-01-04, created: 2018-01-04) back to the list ↑
Test sample code for Python 3 BrokenPipeError woes (debugging notes) post.

#!/usr/bin/python3
import threading
import queue
import time
import os

PIPENAME = '/tmp/mypipe'
VERBOSE = False


class Reader(threading.Thread):
  def __init__(self):
    super(Reader, self).__init__()
    self.q = queue.Queue()

  def end(self):
    self.q.put('end')

  def open(self):
    self.q.put('open')

  def read(self):
    self.q.put('read')

  def close(self):
    self.q.put('close')

  def sync(self):
    self.q.join()

  def run(self):
    while True:
      cmd = self.q.get(True)

      if cmd == 'end':
        self.q.task_done()
        return

      if cmd == 'open':
        if VERBOSE:
          print("Reader opening pipe.")
        self.fd = open(PIPENAME, "r")
        if VERBOSE:
          print("Reader successfully opened pipe.")
        self.q.task_done()
        continue

      if cmd == 'read':
        r = self.fd.read()
        print("Reader read %u byte(s)." % len(r))
        self.q.task_done()
        continue

      if cmd == 'close':
        self.fd.close()
        if VERBOSE:
          print("Reader disconnected.")
        self.q.task_done()
        continue


def test(data_size):
  print("\n--- Testing %u byte(s)" % data_size)
  data = "A" * data_size

  try:
    os.remove(PIPENAME)
  except OSError:
    pass

  os.mkfifo(PIPENAME)

  reader = Reader()
  reader.start()

  reader.open()
  if VERBOSE:
    print("Writer opening pipe.")
  writer = open(PIPENAME, "w")
  if VERBOSE:
    print("Writer successfully opened pipe.")
  reader.sync()

  reader.close()
  reader.sync()

  exp = "Writer didn't raise an exception."

  try:
    writer.write(data)
    try:
      writer.flush()
    except BrokenPipeError:
      exp = "Writer raised BrokenPipeError on flush()."
  except BrokenPipeError:
    exp = "Writer raised BrokenPipeError on write()."

  print(exp)

  reader.open()
  reader.sync()

  if VERBOSE:
    print("Writer is closing the pipe.")
  writer.close()

  reader.read()
  reader.sync()

  reader.close()
  reader.sync()

  reader.end()
  reader.join()

test(1)
test(4096)
test(4097)
test(8192)
test(8193)


Output on Python 3.5/3.6:

--- Testing 1 byte(s)
Writer raised BrokenPipeError on flush().
Reader read 1 byte(s).

--- Testing 4096 byte(s)
Writer raised BrokenPipeError on flush().
Reader read 4096 byte(s).

--- Testing 4097 byte(s)
Writer raised BrokenPipeError on flush().
Reader read 0 byte(s).

--- Testing 8192 byte(s)
Writer raised BrokenPipeError on flush().
Reader read 0 byte(s).

--- Testing 8193 byte(s)
Writer raised BrokenPipeError on write().
Reader read 0 byte(s).

【 design & art by Xa / Gynvael Coldwind 】 【 logo font (birdman regular) by utopiafonts / Dale Harris 】