in parallel-processing/process.py [0:0]
def process():
method_start = time.time()
# Output useful information about the processing starting.
print(
f"Task {TASK_INDEX}: Processing part {TASK_INDEX} of {TASK_COUNT} "
f"for gs://{INPUT_BUCKET}/{INPUT_FILE}"
)
# Download the Cloud Storage object
bucket = storage_client.bucket(INPUT_BUCKET)
blob = bucket.blob(INPUT_FILE)
# Split blog into a list of strings.
contents = blob.download_as_string().decode("utf-8")
data = contents.split("\n")
# Determine the chunk size, and identity this task's chunk to process.
chunk_size = math.ceil(len(data) / TASK_COUNT)
chunk_start = chunk_size * TASK_INDEX
chunk_end = chunk_start + chunk_size
# Process each line in the chunk.
count = 0
loop_start = time.time()
for line in data[chunk_start:chunk_end]:
# Perform your operation here. This is just a placeholder.
_ = hashlib.md5(line.encode("utf-8")).hexdigest()
time.sleep(0.1)
count += 1
# Output useful information about the processing completed.
time_taken = round(time.time() - method_start, 3)
time_setup = round(loop_start - method_start, 3)
print(
f"Task {TASK_INDEX}: Processed {count} lines "
f"(ln {chunk_start}-{min(chunk_end-1, len(data))} of {len(data)}) "
f"in {time_taken}s ({time_setup}s preparing)"
)