-
Notifications
You must be signed in to change notification settings - Fork 223
Closed
Description
I am getting an error running the following code in direct runner
code:
import tensorflow as tf
import apache_beam as beam
import tensorflow_transform.beam as tft_beam
import tensorflow_transform.coders as tft_coders
from apache_beam.options.pipeline_options import PipelineOptions
import tempfile
model = None
def embed_text(text):
import tensorflow_hub as hub
global model
if model is None:
model = hub.load(
'https://tfhub.dev/google/universal-sentence-encoder/4')
embedding = model(text)
return embedding
def get_metadata():
from tensorflow_transform.tf_metadata import dataset_schema
from tensorflow_transform.tf_metadata import dataset_metadata
metadata = dataset_metadata.DatasetMetadata(dataset_schema.Schema({
'id': dataset_schema.ColumnSchema(
tf.string, [], dataset_schema.FixedColumnRepresentation()),
'text': dataset_schema.ColumnSchema(
tf.string, [], dataset_schema.FixedColumnRepresentation())
}))
return metadata
def preprocess_fn(input_features):
text_integerized = embed_text(input_features['text'])
output_features = {
'id': input_features['id'],
'embedding': text_integerized
}
return output_features
def run(pipeline_options, known_args):
argv = None # if None, uses sys.argv
pipeline_options = PipelineOptions(argv)
pipeline = beam.Pipeline(options=pipeline_options)
with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
articles = (
pipeline
| beam.Create([
{'id':'01','text':'To be, or not to be: that is the question: '},
{'id':'02','text':"Whether 'tis nobler in the mind to suffer "},
{'id':'03','text':'The slings and arrows of outrageous fortune, '},
{'id':'04','text':'Or to take arms against a sea of troubles, '},
]))
articles_dataset = (articles, get_metadata())
transformed_dataset, transform_fn = (
articles_dataset
| 'Extract embeddings' >> tft_beam.AnalyzeAndTransformDataset(preprocess_fn)
)
transformed_data, transformed_metadata = transformed_dataset
_ = (
transformed_data | 'Write embeddings to TFRecords' >> beam.io.tfrecordio.WriteToTFRecord(
file_path_prefix='{0}'.format(known_args.output_dir),
file_name_suffix='.tfrecords',
coder=tft_coders.example_proto_coder.ExampleProtoCoder(
transformed_metadata.schema),
num_shards=1
)
)
result = pipeline.run()
result.wait_until_finished()called with:
import argparse
import logging
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from etl import pipeline_local_minimal as pipeline
def get_args(argv):
parser = argparse.ArgumentParser()
parser.add_argument('--output_dir',
help='A directory location of output embeddings')
parser.add_argument('--enable_debug',
action='store_true',
help='Enable debug options.')
parser.add_argument('--debug_output_prefix',
help='Specify prefix of debug output.')
parser.add_argument('--transform_temp_dir',
default='tft_temp',
help='A temp directory used by tf.transform.')
parser.add_argument('--transform_export_dir',
default='tft_out',
help='A directory where tft function is saved')
known_args, pipeline_args = parser.parse_known_args(argv)
return known_args, pipeline_args
def main(argv=None):
known_args, pipeline_args = get_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
setup_options = pipeline_options.view_as(SetupOptions)
setup_options.save_main_session = True
pipeline.run(pipeline_options, known_args)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.ERROR)
main()and
# Configurable parameters
ROOT_DIR="./data"
# Datastore parameters
KIND="wikipedia"
# Directory for output data files
OUTPUT_PREFIX="${ROOT_DIR}/${KIND}/embeddings/embed"
# Working directories for Dataflow
DF_JOB_DIR="${ROOT_DIR}/${KIND}/dataflow"
STAGING_LOCATION="${DF_JOB_DIR}/staging"
TEMP_LOCATION="${DF_JOB_DIR}/temp"
# Working directories for tf.transform
TRANSFORM_ROOT_DIR="${DF_JOB_DIR}/transform"
TRANSFORM_TEMP_DIR="${TRANSFORM_ROOT_DIR}/temp"
TRANSFORM_EXPORT_DIR="${TRANSFORM_ROOT_DIR}/export"
# Working directories for Debug log
DEBUG_OUTPUT_PREFIX="${DF_JOB_DIR}/debug/log"
# Running Config for Dataflow
RUNNER=DirectRunner
# Cleaning working and oputput directories before running the Dataflow job
#echo "Cleaning working and output directories..."
#rm -r "${DF_JOB_DIR}"
#rm -r "${OUTPUT_PREFIX}"
echo "Running the Dataflow job..."
# Command to run the Dataflow job
python run_local.py \
--output_dir="${OUTPUT_PREFIX}" \
--transform_temp_dir="${TRANSFORM_TEMP_DIR}" \
--transform_export_dir="${TRANSFORM_EXPORT_DIR}" \
--runner="${RUNNER}" \
--kind="${KIND}" \
--staging_location="${STAGING_LOCATION}" \
--temp_location="${TEMP_LOCATION}" \
--setup_file=$(pwd)/setup.py \
--enable_debug \
--debug_output_prefix="${DEBUG_OUTPUT_PREFIX}"
echo "Dataflow job submitted successfully!"error:
...
Traceback (most recent call last):
File "apache_beam/runners/common.py", line 780, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 655, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "apache_beam/runners/common.py", line 880, in apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/common.py", line 895, in apache_beam.runners.common._OutputProcessor.process_outputs
File "/Users/justingrace/.pyenv/versions/hlx36/lib/python3.6/site-packages/tensorflow_transform/beam/impl.py", line 338, in process
lambda: self._make_graph_state(saved_model_dir))
File "/Users/justingrace/.pyenv/versions/hlx36/lib/python3.6/site-packages/tfx_bsl/beam/shared.py", line 221, in acquire
return _shared_map.acquire(self._key, constructor_fn)
File "/Users/justingrace/.pyenv/versions/hlx36/lib/python3.6/site-packages/tfx_bsl/beam/shared.py", line 184, in acquire
result = control_block.acquire(constructor_fn)
File "/Users/justingrace/.pyenv/versions/hlx36/lib/python3.6/site-packages/tfx_bsl/beam/shared.py", line 87, in acquire
result = constructor_fn()
File "/Users/justingrace/.pyenv/versions/hlx36/lib/python3.6/site-packages/tensorflow_transform/beam/impl.py", line 338, in <lambda>
lambda: self._make_graph_state(saved_model_dir))
File "/Users/justingrace/.pyenv/versions/hlx36/lib/python3.6/site-packages/tensorflow_transform/beam/impl.py", line 314, in _make_graph_state
self._exclude_outputs, self._tf_config)
File "/Users/justingrace/.pyenv/versions/hlx36/lib/python3.6/site-packages/tensorflow_transform/beam/impl.py", line 232, in __init__
tensor_inputs = graph_tools.get_dependent_inputs(graph, inputs, fetches)
File "/Users/justingrace/.pyenv/versions/hlx36/lib/python3.6/site-packages/tensorflow_transform/graph_tools.py", line 690, in get_dependent_inputs
sink_tensors_ready)
File "/Users/justingrace/.pyenv/versions/hlx36/lib/python3.6/site-packages/tensorflow_transform/graph_tools.py", line 503, in __init__
table_init_op, graph_analyzer_for_table_init, translate_path_fn)
File "/Users/justingrace/.pyenv/versions/hlx36/lib/python3.6/site-packages/tensorflow_transform/graph_tools.py", line 564, in _get_table_init_op_source_info
if table_init_op.type not in _TABLE_INIT_OP_TYPES:
AttributeError: 'Tensor' object has no attribute 'type'
It looks like the graph analyzer is expecting a list of ops with a type attribute but is being passed a tensor instead. It is unclear to me what is going wrong here. Any help would be greatly appreciated!