def _build_pcollection()

in tensorflow_datasets/audio/nsynth.py [0:0]


  def _build_pcollection(self, pipeline, tfrecord_dirs, ids, split):
    """Build PCollection of examples for split."""
    beam = tfds.core.lazy_imports.apache_beam

    def _emit_base_example(ex):
      """Maps an input example to a TFDS example."""
      beam.metrics.Metrics.counter(split, "base-examples").inc()
      features = ex.features.feature
      id_ = features["note_str"].bytes_list.value[0]
      return id_, {
          "id":
              id_,
          "audio":
              np.array(features["audio"].float_list.value, dtype=np.float32),
          "pitch":
              features["pitch"].int64_list.value[0],
          "velocity":
              features["velocity"].int64_list.value[0],
          "instrument": {
              "label":
                  tf.compat.as_text(
                      features["instrument_str"].bytes_list.value[0]),
              "family":
                  tf.compat.as_text(
                      features["instrument_family_str"].bytes_list.value[0]),
              "source":
                  tf.compat.as_text(
                      features["instrument_source_str"].bytes_list.value[0])
          },
          "qualities": {
              q: features["qualities"].int64_list.value[i]
              for (i, q) in enumerate(_QUALITIES)
          }
      }

    def _in_split(id_ex, split_ids):
      unused_id, ex = id_ex
      if not split_ids or tf.compat.as_text(ex["id"]) in split_ids:
        beam.metrics.Metrics.counter(split, "in-split").inc()
        return True
      return False

    def _estimate_f0(id_ex):
      """Estimate the fundamental frequency using CREPE and add to example."""
      id_, ex = id_ex
      beam.metrics.Metrics.counter(split, "estimate-f0").inc()

      audio = ex["audio"]

      # Copied from magenta/ddsp/spectral_ops.py
      # Pad end so that `num_frames = _NUM_SECS * _F0_AND_LOUDNESS_RATE`.
      hop_size = _AUDIO_RATE / _F0_AND_LOUDNESS_RATE
      n_samples = len(audio)
      n_frames = _NUM_SECS * _F0_AND_LOUDNESS_RATE
      n_samples_padded = (n_frames - 1) * hop_size + _CREPE_FRAME_SIZE
      n_padding = (n_samples_padded - n_samples)
      assert n_padding % 1 == 0
      audio = np.pad(audio, (0, int(n_padding)), mode="constant")
      crepe_step_size = 1000 / _F0_AND_LOUDNESS_RATE  # milliseconds

      _, f0_hz, f0_confidence, _ = tfds.core.lazy_imports.crepe.predict(
          audio,
          sr=_AUDIO_RATE,
          viterbi=True,
          step_size=crepe_step_size,
          center=False,
          verbose=0)
      f0_midi = tfds.core.lazy_imports.librosa.core.hz_to_midi(f0_hz)
      # Set -infs introduced by hz_to_midi to 0.
      f0_midi[f0_midi == -np.inf] = 0
      # Set nans to 0 in confidence.
      f0_confidence = np.nan_to_num(f0_confidence)
      ex = dict(ex)
      ex["f0"] = {
          "hz": f0_hz.astype(np.float32),
          "midi": f0_midi.astype(np.float32),
          "confidence": f0_confidence.astype(np.float32),
      }
      return id_, ex

    def _calc_loudness(id_ex):
      """Compute loudness, add to example (ref is white noise, amplitude=1)."""
      id_, ex = id_ex
      beam.metrics.Metrics.counter(split, "compute-loudness").inc()

      audio = ex["audio"]

      # Copied from magenta/ddsp/spectral_ops.py
      # Get magnitudes.
      hop_size = int(_AUDIO_RATE // _F0_AND_LOUDNESS_RATE)

      # Add padding to the end
      n_samples_initial = int(audio.shape[-1])
      n_frames = int(np.ceil(n_samples_initial / hop_size))
      n_samples_final = (n_frames - 1) * hop_size + _LD_N_FFT
      pad = n_samples_final - n_samples_initial
      audio = np.pad(audio, ((0, pad),), "constant")

      librosa = tfds.core.lazy_imports.librosa
      spectra = librosa.stft(
          audio, n_fft=_LD_N_FFT, hop_length=hop_size, center=False).T

      # Compute power
      amplitude = np.abs(spectra)
      amin = 1e-20  # Avoid log(0) instabilities.
      power_db = np.log10(np.maximum(amin, amplitude))
      power_db *= 20.0

      # Perceptual weighting.
      frequencies = librosa.fft_frequencies(sr=_AUDIO_RATE, n_fft=_LD_N_FFT)
      a_weighting = librosa.A_weighting(frequencies)[np.newaxis, :]
      loudness = power_db + a_weighting

      # Set dynamic range.
      loudness -= _REF_DB
      loudness = np.maximum(loudness, -_LD_RANGE)

      # Average over frequency bins.
      mean_loudness_db = np.mean(loudness, axis=-1)

      ex = dict(ex)
      ex["loudness"] = {"db": mean_loudness_db.astype(np.float32)}
      return id_, ex

    examples = (
        pipeline
        | beam.Create([os.path.join(dir_, "*") for dir_ in tfrecord_dirs])
        | beam.io.tfrecordio.ReadAllFromTFRecord(
            coder=beam.coders.ProtoCoder(tf.train.Example))
        | beam.Map(_emit_base_example)
        | beam.Filter(_in_split, split_ids=ids))
    if self.builder_config.estimate_f0_and_loudness:
      examples = (
          examples
          | beam.Reshuffle()
          | beam.Map(_estimate_f0)
          | beam.Map(_calc_loudness))

    return examples