diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/array/BeamSqlArrayItemExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/array/BeamSqlArrayItemExpression.java index 258b2554b603..fa9f6429ec54 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/array/BeamSqlArrayItemExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/array/BeamSqlArrayItemExpression.java @@ -44,6 +44,7 @@ public BeamSqlPrimitive evaluate( List array = (List) opValueEvaluated(0, inputRow, window, env); Integer index = (Integer) opValueEvaluated(1, inputRow, window, env); - return BeamSqlPrimitive.of(outputType, array.get(index)); + // SQL array indexing is 1 based + return BeamSqlPrimitive.of(outputType, array.get(index - 1)); } } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java index c80fb51022e9..e1fd4aed444c 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java @@ -128,7 +128,7 @@ public void testArrayWithRow() { PCollection stream = BeamSqlRelUtils.toPCollection( pipeline, - sqlEnv.parseQuery("SELECT arrayWithRowTestTable.col[0] FROM arrayWithRowTestTable")); + sqlEnv.parseQuery("SELECT arrayWithRowTestTable.col[1] FROM arrayWithRowTestTable")); PAssert.that(stream) .containsInAnyOrder(Row.withSchema(innerRowSchema).addValues("str", 1L).build()); pipeline.run().waitUntilFinish(Duration.standardMinutes(2)); @@ -141,7 +141,7 @@ public void testNestedArray() { BeamSqlRelUtils.toPCollection( pipeline, sqlEnv.parseQuery( - "SELECT nestedArrayTestTable.col[0][2], nestedArrayTestTable.col[1][0] FROM nestedArrayTestTable")); + "SELECT nestedArrayTestTable.col[1][3], nestedArrayTestTable.col[2][1] FROM nestedArrayTestTable")); PAssert.that(stream) .containsInAnyOrder( Row.withSchema(Schema.builder().addInt64Field("field1").addInt64Field("field2").build()) @@ -168,7 +168,7 @@ public void testRowWithArray() { BeamSqlRelUtils.toPCollection( pipeline, sqlEnv.parseQuery( - "SELECT rowWithArrayTestTable.col.field3[1] FROM rowWithArrayTestTable")); + "SELECT rowWithArrayTestTable.col.field3[2] FROM rowWithArrayTestTable")); PAssert.that(stream) .containsInAnyOrder( Row.withSchema(Schema.builder().addInt64Field("int64").build()).addValue(6L).build()); diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslArrayTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslArrayTest.java index 42b9a0397d44..eb90274cc5d1 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslArrayTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslArrayTest.java @@ -98,7 +98,7 @@ public void testAccessArrayElement() { Schema resultType = Schema.builder().addStringField("f_arrElem").build(); PCollection result = - input.apply("sqlQuery", SqlTransform.query("SELECT f_stringArr[0] FROM PCOLLECTION")); + input.apply("sqlQuery", SqlTransform.query("SELECT f_stringArr[1] FROM PCOLLECTION")); PAssert.that(result) .containsInAnyOrder( @@ -348,7 +348,7 @@ public void testSelectSingleRowFromArrayOfRows() { PCollection result = input - .apply(SqlTransform.query("SELECT f_arrayOfRows[1] FROM PCOLLECTION")) + .apply(SqlTransform.query("SELECT f_arrayOfRows[2] FROM PCOLLECTION")) .setRowSchema(resultSchema); PAssert.that(result) @@ -394,7 +394,7 @@ public void testSelectRowFieldFromArrayOfRows() { PCollection result = input - .apply(SqlTransform.query("SELECT f_arrayOfRows[1].f_rowString FROM PCOLLECTION")) + .apply(SqlTransform.query("SELECT f_arrayOfRows[2].f_rowString FROM PCOLLECTION")) .setRowSchema(resultSchema); PAssert.that(result) diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslNestedRowsTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslNestedRowsTest.java index ec5dc32a77ad..2f1f3c09a2da 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslNestedRowsTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslNestedRowsTest.java @@ -258,7 +258,7 @@ public void testNestedRowArrayElementAccess() { input .apply( SqlTransform.query( - "SELECT `PCOLLECTION`.`f_nestedRow`.`f_nestedArray`[1] FROM PCOLLECTION")) + "SELECT `PCOLLECTION`.`f_nestedRow`.`f_nestedArray`[2] FROM PCOLLECTION")) .setRowSchema(resultSchema); PAssert.that(result) diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/array/BeamSqlArrayItemExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/array/BeamSqlArrayItemExpressionTest.java index 2180220bac0b..caad5e9cc117 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/array/BeamSqlArrayItemExpressionTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/array/BeamSqlArrayItemExpressionTest.java @@ -43,7 +43,7 @@ public void testAccessesElement0() { List input = ImmutableList.of( BeamSqlPrimitive.of(SqlTypeName.ARRAY, Arrays.asList("aaa", "bbb")), - BeamSqlPrimitive.of(SqlTypeName.INTEGER, 0)); + BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)); BeamSqlArrayItemExpression expression = new BeamSqlArrayItemExpression(input, SqlTypeName.VARCHAR); @@ -60,7 +60,7 @@ public void testAccessesElement1() { List input = ImmutableList.of( BeamSqlPrimitive.of(SqlTypeName.ARRAY, Arrays.asList("aaa", "bbb")), - BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)); + BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2)); BeamSqlArrayItemExpression expression = new BeamSqlArrayItemExpression(input, SqlTypeName.VARCHAR); @@ -77,7 +77,7 @@ public void testAcceptsTwoOperands() { List input = ImmutableList.of( BeamSqlPrimitive.of(SqlTypeName.ARRAY, Arrays.asList("aaa", "bbb")), - BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)); + BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2)); BeamSqlArrayItemExpression expression = new BeamSqlArrayItemExpression(input, SqlTypeName.VARCHAR);