Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public BeamSqlPrimitive evaluate(
List<Object> array = (List) opValueEvaluated(0, inputRow, window, env);
Integer index = (Integer) opValueEvaluated(1, inputRow, window, env);

return BeamSqlPrimitive.of(outputType, array.get(index));
// SQL array indexing is 1 based
return BeamSqlPrimitive.of(outputType, array.get(index - 1));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public void testArrayWithRow() {
PCollection<Row> stream =
BeamSqlRelUtils.toPCollection(
pipeline,
sqlEnv.parseQuery("SELECT arrayWithRowTestTable.col[0] FROM arrayWithRowTestTable"));
sqlEnv.parseQuery("SELECT arrayWithRowTestTable.col[1] FROM arrayWithRowTestTable"));
PAssert.that(stream)
.containsInAnyOrder(Row.withSchema(innerRowSchema).addValues("str", 1L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
Expand All @@ -141,7 +141,7 @@ public void testNestedArray() {
BeamSqlRelUtils.toPCollection(
pipeline,
sqlEnv.parseQuery(
"SELECT nestedArrayTestTable.col[0][2], nestedArrayTestTable.col[1][0] FROM nestedArrayTestTable"));
"SELECT nestedArrayTestTable.col[1][3], nestedArrayTestTable.col[2][1] FROM nestedArrayTestTable"));
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addInt64Field("field1").addInt64Field("field2").build())
Expand All @@ -168,7 +168,7 @@ public void testRowWithArray() {
BeamSqlRelUtils.toPCollection(
pipeline,
sqlEnv.parseQuery(
"SELECT rowWithArrayTestTable.col.field3[1] FROM rowWithArrayTestTable"));
"SELECT rowWithArrayTestTable.col.field3[2] FROM rowWithArrayTestTable"));
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(Schema.builder().addInt64Field("int64").build()).addValue(6L).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void testAccessArrayElement() {
Schema resultType = Schema.builder().addStringField("f_arrElem").build();

PCollection<Row> result =
input.apply("sqlQuery", SqlTransform.query("SELECT f_stringArr[0] FROM PCOLLECTION"));
input.apply("sqlQuery", SqlTransform.query("SELECT f_stringArr[1] FROM PCOLLECTION"));

PAssert.that(result)
.containsInAnyOrder(
Expand Down Expand Up @@ -348,7 +348,7 @@ public void testSelectSingleRowFromArrayOfRows() {

PCollection<Row> result =
input
.apply(SqlTransform.query("SELECT f_arrayOfRows[1] FROM PCOLLECTION"))
.apply(SqlTransform.query("SELECT f_arrayOfRows[2] FROM PCOLLECTION"))
.setRowSchema(resultSchema);

PAssert.that(result)
Expand Down Expand Up @@ -394,7 +394,7 @@ public void testSelectRowFieldFromArrayOfRows() {

PCollection<Row> result =
input
.apply(SqlTransform.query("SELECT f_arrayOfRows[1].f_rowString FROM PCOLLECTION"))
.apply(SqlTransform.query("SELECT f_arrayOfRows[2].f_rowString FROM PCOLLECTION"))
.setRowSchema(resultSchema);

PAssert.that(result)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ public void testNestedRowArrayElementAccess() {
input
.apply(
SqlTransform.query(
"SELECT `PCOLLECTION`.`f_nestedRow`.`f_nestedArray`[1] FROM PCOLLECTION"))
"SELECT `PCOLLECTION`.`f_nestedRow`.`f_nestedArray`[2] FROM PCOLLECTION"))
.setRowSchema(resultSchema);

PAssert.that(result)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void testAccessesElement0() {
List<BeamSqlExpression> input =
ImmutableList.of(
BeamSqlPrimitive.of(SqlTypeName.ARRAY, Arrays.asList("aaa", "bbb")),
BeamSqlPrimitive.of(SqlTypeName.INTEGER, 0));
BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));

BeamSqlArrayItemExpression expression =
new BeamSqlArrayItemExpression(input, SqlTypeName.VARCHAR);
Expand All @@ -60,7 +60,7 @@ public void testAccessesElement1() {
List<BeamSqlExpression> input =
ImmutableList.of(
BeamSqlPrimitive.of(SqlTypeName.ARRAY, Arrays.asList("aaa", "bbb")),
BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));

BeamSqlArrayItemExpression expression =
new BeamSqlArrayItemExpression(input, SqlTypeName.VARCHAR);
Expand All @@ -77,7 +77,7 @@ public void testAcceptsTwoOperands() {
List<BeamSqlExpression> input =
ImmutableList.of(
BeamSqlPrimitive.of(SqlTypeName.ARRAY, Arrays.asList("aaa", "bbb")),
BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));

BeamSqlArrayItemExpression expression =
new BeamSqlArrayItemExpression(input, SqlTypeName.VARCHAR);
Expand Down