## Example

### Union

``````Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "a, b, c");
Table result = left.union(right);``````
• The union method is similar to the union of sql.

### UnionAll

``````Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "a, b, c");
Table result = left.unionAll(right);``````
• UnionAll method is similar to union all of sql.

### Intersect

``````Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "d, e, f");
Table result = left.intersect(right);``````
• The intersect method is similar to the intersect of sql.

### IntersectAll

``````Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "d, e, f");
Table result = left.intersectAll(right);``````
• The intersectAll method is similar to the intersect all of sql.

### Minus

``````Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "a, b, c");
Table result = left.minus(right);``````
• Minus method is similar to sql exception.

### MinusAll

``````Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "a, b, c");
Table result = left.minusAll(right);``````
• The minusAll method is similar to sql’s except all.

### In

``````Table left = ds1.toTable(tableEnv, "a, b, c");
Table right = ds2.toTable(tableEnv, "a");

// using implicit registration
Table result = left.select("a, b, c").where("a.in(" + right + ")");

// using explicit registration
tableEnv.registerTable("RightTable", right);
Table result = left.select("a, b, c").where("a.in(RightTable)");``````
• The in method is similar to the in of sql.

## Table

``````class Table(

//......

def union(right: Table): Table = {
// check that right table belongs to the same TableEnvironment
if (right.tableEnv != this.tableEnv) {
throw new ValidationException("Only tables from the same TableEnvironment can be unioned.")
}
new Table(tableEnv, Union(logicalPlan, right.logicalPlan, all = false).validate(tableEnv))
}

def unionAll(right: Table): Table = {
// check that right table belongs to the same TableEnvironment
if (right.tableEnv != this.tableEnv) {
throw new ValidationException("Only tables from the same TableEnvironment can be unioned.")
}
new Table(tableEnv, Union(logicalPlan, right.logicalPlan, all = true).validate(tableEnv))
}

def intersect(right: Table): Table = {
// check that right table belongs to the same TableEnvironment
if (right.tableEnv != this.tableEnv) {
throw new ValidationException(
"Only tables from the same TableEnvironment can be intersected.")
}
new Table(tableEnv, Intersect(logicalPlan, right.logicalPlan, all = false).validate(tableEnv))
}

def intersectAll(right: Table): Table = {
// check that right table belongs to the same TableEnvironment
if (right.tableEnv != this.tableEnv) {
throw new ValidationException(
"Only tables from the same TableEnvironment can be intersected.")
}
new Table(tableEnv, Intersect(logicalPlan, right.logicalPlan, all = true).validate(tableEnv))
}

def minus(right: Table): Table = {
// check that right table belongs to the same TableEnvironment
if (right.tableEnv != this.tableEnv) {
throw new ValidationException("Only tables from the same TableEnvironment can be " +
"subtracted.")
}
new Table(tableEnv, Minus(logicalPlan, right.logicalPlan, all = false)
.validate(tableEnv))
}

def minusAll(right: Table): Table = {
// check that right table belongs to the same TableEnvironment
if (right.tableEnv != this.tableEnv) {
throw new ValidationException("Only tables from the same TableEnvironment can be " +
"subtracted.")
}
new Table(tableEnv, Minus(logicalPlan, right.logicalPlan, all = true)
.validate(tableEnv))
}

//......
}``````
• Union and unionAll use Union, intersect and intersectAll use Intersect, minus and minusAll use Minus

### Union

``````case class Union(left: LogicalNode, right: LogicalNode, all: Boolean) extends BinaryNode {
override def output: Seq[Attribute] = left.output

override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
left.construct(relBuilder)
right.construct(relBuilder)
relBuilder.union(all)
}

override def validate(tableEnv: TableEnvironment): LogicalNode = {
if (tableEnv.isInstanceOf[StreamTableEnvironment] && !all) {
failValidation(s"Union on stream tables is currently not supported.")
}

val resolvedUnion = super.validate(tableEnv).asInstanceOf[Union]
if (left.output.length != right.output.length) {
failValidation(s"Union two tables of different column sizes:" +
s" \${left.output.size} and \${right.output.size}")
}
val sameSchema = left.output.zip(right.output).forall { case (l, r) =>
l.resultType == r.resultType
}
if (!sameSchema) {
failValidation(s"Union two tables of different schema:" +
s" [\${left.output.map(a => (a.name, a.resultType)).mkString(", ")}] and" +
s" [\${right.output.map(a => (a.name, a.resultType)).mkString(", ")}]")
}
resolvedUnion
}
}``````
• Union inherits BinaryNode, and its construction method construct union operation through relBuilder.union

### Intersect

``````case class Intersect(left: LogicalNode, right: LogicalNode, all: Boolean) extends BinaryNode {
override def output: Seq[Attribute] = left.output

override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
left.construct(relBuilder)
right.construct(relBuilder)
relBuilder.intersect(all)
}

override def validate(tableEnv: TableEnvironment): LogicalNode = {
if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
failValidation(s"Intersect on stream tables is currently not supported.")
}

val resolvedIntersect = super.validate(tableEnv).asInstanceOf[Intersect]
if (left.output.length != right.output.length) {
failValidation(s"Intersect two tables of different column sizes:" +
s" \${left.output.size} and \${right.output.size}")
}
// allow different column names between tables
val sameSchema = left.output.zip(right.output).forall { case (l, r) =>
l.resultType == r.resultType
}
if (!sameSchema) {
failValidation(s"Intersect two tables of different schema:" +
s" [\${left.output.map(a => (a.name, a.resultType)).mkString(", ")}] and" +
s" [\${right.output.map(a => (a.name, a.resultType)).mkString(", ")}]")
}
resolvedIntersect
}
}``````
• Intersect inherits BinaryNode, and its construction method uses relBuilder.intersect to construct intersect operations.

### Minus

``````case class Minus(left: LogicalNode, right: LogicalNode, all: Boolean) extends BinaryNode {
override def output: Seq[Attribute] = left.output

override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
left.construct(relBuilder)
right.construct(relBuilder)
relBuilder.minus(all)
}

override def validate(tableEnv: TableEnvironment): LogicalNode = {
if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
failValidation(s"Minus on stream tables is currently not supported.")
}

val resolvedMinus = super.validate(tableEnv).asInstanceOf[Minus]
if (left.output.length != right.output.length) {
failValidation(s"Minus two table of different column sizes:" +
s" \${left.output.size} and \${right.output.size}")
}
val sameSchema = left.output.zip(right.output).forall { case (l, r) =>
l.resultType == r.resultType
}
if (!sameSchema) {
failValidation(s"Minus two table of different schema:" +
s" [\${left.output.map(a => (a.name, a.resultType)).mkString(", ")}] and" +
s" [\${right.output.map(a => (a.name, a.resultType)).mkString(", ")}]")
}
resolvedMinus
}
}``````
• Minus inherits BinaryNode, and its construct method uses relBuilder.minus to build minus operations.

## Summary

• Table provides union, unionAll, intersect, intersectAll, minus, minusAll, in (`In in where clause`) operation
• Union and unionAll use Union, intersect and intersectAll use Intersect, minus and minusAll use Minus
• Union inherits BinaryNode, and its construction method uses relBuilder.union to construct union operation. Intersect inherits BinaryNode, and its construction method uses relBuilder.intersect to construct intersect operations. Minus inherits BinaryNode, and its construct method uses relBuilder.minus to build minus operations.