Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,16 @@ public Builder addValues(Object ... values) {
return addValues(Arrays.asList(values));
}

public Builder addArray(List<Object> values) {
this.values.add(values);
return this;
}

public Builder addArray(Object ... values) {
addArray(Arrays.asList(values));
return this;
}

public Row build() {
checkNotNull(type);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ public Builder withTimestampField(String fieldName) {
return withField(fieldName, SqlTypeCoders.TIMESTAMP);
}

public Builder withArrayField(String fieldName, SqlTypeCoder elementCoder) {
return withField(fieldName, SqlTypeCoders.arrayOf(elementCoder));
}

private Builder() {
this.fields = ImmutableList.builder();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Objects;
import org.apache.beam.sdk.coders.BigDecimalCoder;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.ByteCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;

/**
Expand Down Expand Up @@ -55,15 +57,19 @@ public void verifyDeterministic() throws NonDeterministicException {
@Override
public boolean equals(Object other) {
return other != null && this.getClass().equals(other.getClass());

}

@Override
public int hashCode() {
return this.getClass().hashCode();
}

public static boolean isArray(SqlTypeCoder sqlTypeCoder) {
return sqlTypeCoder instanceof SqlArrayCoder;
}

static class SqlTinyIntCoder extends SqlTypeCoder {

@Override
protected Coder delegateCoder() {
return ByteCoder.of();
Expand Down Expand Up @@ -153,4 +159,43 @@ protected Coder delegateCoder() {
return RowHelper.DateCoder.of();
}
}

/**
* Represents SQL ARRAY type.
*
* <p>Delegates to {#code elementCoder} to encode elements.
*/
public static class SqlArrayCoder extends SqlTypeCoder {

private SqlTypeCoder elementCoder;

private SqlArrayCoder(SqlTypeCoder elementCoder) {
this.elementCoder = elementCoder;
}

public static SqlArrayCoder of(SqlTypeCoder elementCoder) {
return new SqlArrayCoder(elementCoder);
}

@Override
protected Coder delegateCoder() {
return ListCoder.of(elementCoder);
}

public SqlTypeCoder getElementCoder() {
return elementCoder;
}

@Override
public boolean equals(Object other) {
return other != null
&& this.getClass().equals(other.getClass())
&& this.elementCoder.equals(((SqlArrayCoder) other).elementCoder);
}

@Override
public int hashCode() {
return Objects.hashCode(elementCoder);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import com.google.common.collect.ImmutableSet;
import java.util.Set;
import org.apache.beam.sdk.extensions.sql.SqlTypeCoder.SqlArrayCoder;
import org.apache.beam.sdk.extensions.sql.SqlTypeCoder.SqlIntegerCoder;

/**
Expand All @@ -55,6 +56,10 @@ public class SqlTypeCoders {
public static final SqlTypeCoder DATE = new SqlDateCoder();
public static final SqlTypeCoder TIMESTAMP = new SqlTimestampCoder();

public static SqlTypeCoder arrayOf(SqlTypeCoder elementCoder) {
return SqlArrayCoder.of(elementCoder);
}

public static final Set<SqlTypeCoder> NUMERIC_TYPES =
ImmutableSet.of(
SqlTypeCoders.TINYINT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,28 +31,53 @@
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlWindowEndExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlWindowExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlWindowStartExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic.BeamSqlDivideExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic.BeamSqlMinusExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic
.BeamSqlDivideExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic
.BeamSqlMinusExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic.BeamSqlModExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic.BeamSqlMultiplyExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic.BeamSqlPlusExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlEqualsExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlGreaterThanExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlGreaterThanOrEqualsExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlIsNotNullExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlIsNullExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlLessThanExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlLessThanOrEqualsExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlNotEqualsExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlCurrentDateExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlCurrentTimeExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlCurrentTimestampExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic
.BeamSqlMultiplyExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic
.BeamSqlPlusExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.array.BeamSqlArrayExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.array
.BeamSqlArrayItemExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.collection
.BeamSqlCardinalityExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.collection
.BeamSqlSingleElementExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison
.BeamSqlEqualsExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison
.BeamSqlGreaterThanExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison
.BeamSqlGreaterThanOrEqualsExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison
.BeamSqlIsNotNullExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison
.BeamSqlIsNullExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison
.BeamSqlLessThanExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison
.BeamSqlLessThanOrEqualsExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison
.BeamSqlNotEqualsExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date
.BeamSqlCurrentDateExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date
.BeamSqlCurrentTimeExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date
.BeamSqlCurrentTimestampExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDateCeilExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDateFloorExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDatetimeMinusExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDatetimePlusExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date
.BeamSqlDatetimeMinusExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date
.BeamSqlDatetimePlusExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlExtractExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlIntervalMultiplyExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date
.BeamSqlIntervalMultiplyExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlAndExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlNotExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlOrExpression;
Expand All @@ -73,20 +98,25 @@
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlPowerExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlRadiansExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlRandExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlRandIntegerExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math
.BeamSqlRandIntegerExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlRoundExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlSignExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlSinExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlTanExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlTruncateExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.reinterpret.BeamSqlReinterpretExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlCharLengthExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.reinterpret
.BeamSqlReinterpretExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string
.BeamSqlCharLengthExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlConcatExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlInitCapExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlLowerExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlOverlayExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlPositionExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlSubstringExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string
.BeamSqlPositionExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string
.BeamSqlSubstringExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlTrimExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlUpperExpression;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamFilterRel;
Expand Down Expand Up @@ -384,6 +414,20 @@ static BeamSqlExpression buildExpression(RexNode rexNode) {
case "DATETIME_PLUS":
return new BeamSqlDatetimePlusExpression(subExps);

// array functions
case "ARRAY":
return new BeamSqlArrayExpression(subExps);

case "ITEM":
return new BeamSqlArrayItemExpression(subExps, node.type.getSqlTypeName());

// collections functions
case "ELEMENT":
return new BeamSqlSingleElementExpression(subExps, node.type.getSqlTypeName());

case "CARDINALITY":
return new BeamSqlCardinalityExpression(subExps, node.type.getSqlTypeName());

//DEFAULT keyword for UDF with optional parameter
case "DEFAULT":
return new BeamSqlDefaultExpression();
Expand Down Expand Up @@ -424,9 +468,10 @@ static BeamSqlExpression buildExpression(RexNode rexNode) {
ScalarFunctionImpl fn = (ScalarFunctionImpl) udf.getFunction();
ret = new BeamSqlUdfExpression(fn.method, subExps,
((RexCall) rexNode).type.getSqlTypeName());
} else {
throw new UnsupportedOperationException("Operator: " + opName + " is not supported yet!");
}
} else {
throw new UnsupportedOperationException(
"Operator: " + opName + " is not supported yet");
}
}
} else {
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ public boolean accept() {
case SYMBOL:
// for SYMBOL, it supports anything...
return true;
case ARRAY:
return value instanceof List;
default:
throw new UnsupportedOperationException(outputType.name());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.array;

import java.util.List;
import java.util.stream.Collectors;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.Row;
import org.apache.calcite.sql.type.SqlTypeName;

/**
* Represents ARRAY expression in SQL.
*/
public class BeamSqlArrayExpression extends BeamSqlExpression {
public BeamSqlArrayExpression(List<BeamSqlExpression> operands) {
super(operands, SqlTypeName.ARRAY);
}

@Override
public boolean accept() {
return
operands
.stream()
.map(BeamSqlExpression::getOutputType)
.distinct()
.count() == 1;
}

@Override
public BeamSqlPrimitive evaluate(Row inputRow, BoundedWindow window) {
List<Object> elements =
operands
.stream()
.map(op -> op.evaluate(inputRow, window).getValue())
.collect(Collectors.toList());

return BeamSqlPrimitive.of(outputType, elements);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.array;

import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.Row;
import org.apache.calcite.sql.type.SqlTypeName;

/**
* Implements array element access expression.
*/
public class BeamSqlArrayItemExpression extends BeamSqlExpression {

public BeamSqlArrayItemExpression(
List<BeamSqlExpression> operands,
SqlTypeName sqlTypeName) {

super(operands, sqlTypeName);
}

@Override
public boolean accept() {
return operands.size() == 2;
}

@Override
public BeamSqlPrimitive evaluate(Row inputRow, BoundedWindow window) {
List<Object> array = opValueEvaluated(0, inputRow, window);
Integer index = opValueEvaluated(1, inputRow, window);

return BeamSqlPrimitive.of(outputType, array.get(index));
}
}
Loading