Skip to content

Error in graph analyzer when using universal sentence encoder from tf_hub as per tutorial #160

@jusjosgra

Description

@jusjosgra

I am getting an error running the following code in direct runner
code:

import tensorflow as tf
import apache_beam as beam
import tensorflow_transform.beam as tft_beam
import tensorflow_transform.coders as tft_coders
from apache_beam.options.pipeline_options import PipelineOptions
import tempfile

model = None

def embed_text(text):
    import tensorflow_hub as hub
    global model
    if model is None:
        model = hub.load(
            'https://tfhub.dev/google/universal-sentence-encoder/4')
    embedding = model(text)
    return embedding


def get_metadata():
    from tensorflow_transform.tf_metadata import dataset_schema
    from tensorflow_transform.tf_metadata import dataset_metadata

    metadata = dataset_metadata.DatasetMetadata(dataset_schema.Schema({
        'id': dataset_schema.ColumnSchema(
            tf.string, [], dataset_schema.FixedColumnRepresentation()),
        'text': dataset_schema.ColumnSchema(
            tf.string, [], dataset_schema.FixedColumnRepresentation())
    }))
    return metadata


def preprocess_fn(input_features):

    text_integerized = embed_text(input_features['text'])
    output_features = {
        'id': input_features['id'],
        'embedding': text_integerized
    }
    return output_features


def run(pipeline_options, known_args):
    argv = None  # if None, uses sys.argv
    pipeline_options = PipelineOptions(argv)

    pipeline = beam.Pipeline(options=pipeline_options)
    with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
        articles = (
                pipeline
                | beam.Create([
            {'id':'01','text':'To be, or not to be: that is the question: '},
            {'id':'02','text':"Whether 'tis nobler in the mind to suffer "},
            {'id':'03','text':'The slings and arrows of outrageous fortune, '},
            {'id':'04','text':'Or to take arms against a sea of troubles, '},
        ]))

        articles_dataset = (articles, get_metadata())

        transformed_dataset, transform_fn = (
                articles_dataset
                | 'Extract embeddings' >> tft_beam.AnalyzeAndTransformDataset(preprocess_fn)
        )

        transformed_data, transformed_metadata = transformed_dataset

        _ = (
            transformed_data | 'Write embeddings to TFRecords' >> beam.io.tfrecordio.WriteToTFRecord(
            file_path_prefix='{0}'.format(known_args.output_dir),
            file_name_suffix='.tfrecords',
            coder=tft_coders.example_proto_coder.ExampleProtoCoder(
                transformed_metadata.schema),
            num_shards=1
            )
            )
    result = pipeline.run()
    result.wait_until_finished()

called with:

import argparse
import logging
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

from etl import pipeline_local_minimal as pipeline


def get_args(argv):

  parser = argparse.ArgumentParser()

  parser.add_argument('--output_dir',
                      help='A directory location of output embeddings')

  parser.add_argument('--enable_debug',
                      action='store_true',
                      help='Enable debug options.')

  parser.add_argument('--debug_output_prefix',
                      help='Specify prefix of debug output.')

  parser.add_argument('--transform_temp_dir',
                      default='tft_temp',
                      help='A temp directory used by tf.transform.')

  parser.add_argument('--transform_export_dir',
                      default='tft_out',
                      help='A directory where tft function is saved')

  known_args, pipeline_args = parser.parse_known_args(argv)
  return known_args, pipeline_args


def main(argv=None):
  known_args, pipeline_args = get_args(argv)
  pipeline_options = PipelineOptions(pipeline_args)
  setup_options = pipeline_options.view_as(SetupOptions)
  setup_options.save_main_session = True
  pipeline.run(pipeline_options, known_args)


if __name__ == '__main__':
  logging.getLogger().setLevel(logging.ERROR)
  main()

and

# Configurable parameters
ROOT_DIR="./data"

# Datastore parameters
KIND="wikipedia"

# Directory for output data files
OUTPUT_PREFIX="${ROOT_DIR}/${KIND}/embeddings/embed"

# Working directories for Dataflow
DF_JOB_DIR="${ROOT_DIR}/${KIND}/dataflow"
STAGING_LOCATION="${DF_JOB_DIR}/staging"
TEMP_LOCATION="${DF_JOB_DIR}/temp"

# Working directories for tf.transform
TRANSFORM_ROOT_DIR="${DF_JOB_DIR}/transform"
TRANSFORM_TEMP_DIR="${TRANSFORM_ROOT_DIR}/temp"
TRANSFORM_EXPORT_DIR="${TRANSFORM_ROOT_DIR}/export"

# Working directories for Debug log
DEBUG_OUTPUT_PREFIX="${DF_JOB_DIR}/debug/log"

# Running Config for Dataflow
RUNNER=DirectRunner

# Cleaning working and oputput directories before running the Dataflow job
#echo "Cleaning working and output directories..."
#rm -r "${DF_JOB_DIR}"
#rm -r "${OUTPUT_PREFIX}"

echo "Running the Dataflow job..."

# Command to run the Dataflow job
python run_local.py \
  --output_dir="${OUTPUT_PREFIX}" \
  --transform_temp_dir="${TRANSFORM_TEMP_DIR}" \
  --transform_export_dir="${TRANSFORM_EXPORT_DIR}" \
  --runner="${RUNNER}" \
  --kind="${KIND}" \
  --staging_location="${STAGING_LOCATION}" \
  --temp_location="${TEMP_LOCATION}" \
  --setup_file=$(pwd)/setup.py \
  --enable_debug \
  --debug_output_prefix="${DEBUG_OUTPUT_PREFIX}"

echo "Dataflow job submitted successfully!"

error:

...
Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 780, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 655, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "apache_beam/runners/common.py", line 880, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/common.py", line 895, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "/Users/justingrace/.pyenv/versions/hlx36/lib/python3.6/site-packages/tensorflow_transform/beam/impl.py", line 338, in process
    lambda: self._make_graph_state(saved_model_dir))
  File "/Users/justingrace/.pyenv/versions/hlx36/lib/python3.6/site-packages/tfx_bsl/beam/shared.py", line 221, in acquire
    return _shared_map.acquire(self._key, constructor_fn)
  File "/Users/justingrace/.pyenv/versions/hlx36/lib/python3.6/site-packages/tfx_bsl/beam/shared.py", line 184, in acquire
    result = control_block.acquire(constructor_fn)
  File "/Users/justingrace/.pyenv/versions/hlx36/lib/python3.6/site-packages/tfx_bsl/beam/shared.py", line 87, in acquire
    result = constructor_fn()
  File "/Users/justingrace/.pyenv/versions/hlx36/lib/python3.6/site-packages/tensorflow_transform/beam/impl.py", line 338, in <lambda>
    lambda: self._make_graph_state(saved_model_dir))
  File "/Users/justingrace/.pyenv/versions/hlx36/lib/python3.6/site-packages/tensorflow_transform/beam/impl.py", line 314, in _make_graph_state
    self._exclude_outputs, self._tf_config)
  File "/Users/justingrace/.pyenv/versions/hlx36/lib/python3.6/site-packages/tensorflow_transform/beam/impl.py", line 232, in __init__
    tensor_inputs = graph_tools.get_dependent_inputs(graph, inputs, fetches)
  File "/Users/justingrace/.pyenv/versions/hlx36/lib/python3.6/site-packages/tensorflow_transform/graph_tools.py", line 690, in get_dependent_inputs
    sink_tensors_ready)
  File "/Users/justingrace/.pyenv/versions/hlx36/lib/python3.6/site-packages/tensorflow_transform/graph_tools.py", line 503, in __init__
    table_init_op, graph_analyzer_for_table_init, translate_path_fn)
  File "/Users/justingrace/.pyenv/versions/hlx36/lib/python3.6/site-packages/tensorflow_transform/graph_tools.py", line 564, in _get_table_init_op_source_info
    if table_init_op.type not in _TABLE_INIT_OP_TYPES:
AttributeError: 'Tensor' object has no attribute 'type'

It looks like the graph analyzer is expecting a list of ops with a type attribute but is being passed a tensor instead. It is unclear to me what is going wrong here. Any help would be greatly appreciated!

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions