in asfpy/justone.py [0:0]
def _temp_fifo(fifo_fname):
# Create the FIFO
try:
os.mkfifo(fifo_fname)
except OSError:
# Likely RACE: a FIFO exists/appeared (and mkfifo failed).
# Let the other process run with this.
yield False # not okay
return # stop iterating
try:
# Open it for reading, to signal "we got this".
fd = os.open(fifo_fname, os.O_RDONLY | os.O_NONBLOCK)
try:
# Okay to run.
yield True
finally:
os.close(fd)
finally:
# Note: there might be a RACE where the OPEN fails after
# we created the FIFO (eg. another process removed it).
# Not sure how this would happen. Just bail.
try:
os.unlink(fifo_fname)
except OSError:
# In case the FIFO disappeared somehow: we don't care.
pass
# stop iterating
return