Skip to content

Commit

Permalink
Merge pull request #464 from apache/kamir-patch-1
Browse files Browse the repository at this point in the history
Kamir patch 1
  • Loading branch information
2pk03 authored Aug 23, 2024
2 parents 88dce06 + b998d5d commit 7895720
Show file tree
Hide file tree
Showing 49 changed files with 1,887 additions and 49 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ jobs:
uses: actions/setup-java@v2
with:
java-version: 11
distribution: 'adopt'
distribution: 'adopt'
- name: Install Protoc
run: sudo apt install -y protobuf-compiler
- uses: actions/cache@v2
Expand Down
10 changes: 10 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
**/*.iml
**/maven-wrapper.jar

.env
.env.sh

# Maven
**/target/

Expand All @@ -22,5 +25,12 @@ pom.xml.*
.vscode
.vscode/*


# Kafka integration configuration for development
/build/env.sh
/conf/kafka/default.properties
/wayang-platforms/wayang-java/src/main/resources/wayang-kafka-defaults.properties
/wayang-platforms/wayang-java/src/main/resources/wayang-kafka-defaults.properties.template

# Scala Plugin for VSCode
.metals
25 changes: 25 additions & 0 deletions build/_env.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
'''
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
'''

# Kafka secrets ...
export BOOTSTRAP_SERVER=...
export CLUSTER_API_KEY=...
export CLUSTER_API_SECRET=...
export SR_ENDPOINT=...
export SR_API_KEY=...
export SR_API_SECRET=...
Original file line number Diff line number Diff line change
Expand Up @@ -938,6 +938,44 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I
writeTextFileJava(url, toSerializableFunction(formatterUdf), udfLoad)
}

/**
* Write the data quanta in this instance to a text file. Triggers execution.
*
* @param topicName topicName to write into
* @param formatterUdf UDF to format data quanta to [[String]]s
* @param udfLoad optional [[LoadProfileEstimator]] for the `udf`
*/
def writeKafkaTopic(topicName: String,
formatterUdf: Out => String,
udfLoad: LoadProfileEstimator = null): Unit = {
writeKafkaTopicJava(topicName, toSerializableFunction(formatterUdf), udfLoad)
}

/**
* Write the data quanta in this instance to a text file. Triggers execution.
*
* @param url URL to the text file
* @param formatterUdf UDF to format data quanta to [[String]]s
* @param udfLoad optional [[LoadProfileEstimator]] for the `udf`
*/
def writeKafkaTopicJava(topicName: String,
formatterUdf: SerializableFunction[Out, String],
udfLoad: LoadProfileEstimator = null): Unit = {

val sink = new KafkaTopicSink[Out](
topicName,
new TransformationDescriptor(formatterUdf, basicDataUnitType[Out], basicDataUnitType[String], udfLoad)
)
sink.setName(s"Write to KafkaTopic $topicName")
this.connectTo(sink, 0)

// Do the execution.
this.planBuilder.sinks += sink
this.planBuilder.buildAndExecute()
this.planBuilder.sinks.clear()
}


/**
* Write the data quanta in this instance to a text file. Triggers execution.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,21 @@ trait DataQuantaBuilder[+This <: DataQuantaBuilder[_, Out], Out] extends Logging
this.dataQuanta().writeTextFileJava(url, formatterUdf, udfLoadProfileEstimator)
}

/**
* Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.KafkaTopicSink]]. This triggers
* execution of the constructed [[WayangPlan]].
*
* @param topicName of the Kafka topic to be written
* @return the collected data quanta
*/
def writeKafkaTopic(topicName: String,
formatterUdf: Out => String,
jobName: String,
udfLoadProfileEstimator: LoadProfileEstimator): Unit = {
this.javaPlanBuilder.withJobName(jobName)
this.dataQuanta().writeKafkaTopic(topicName, formatterUdf, udfLoadProfileEstimator)
}

/**
* Enriches the set of operations to [[Record]]-based ones. This instances must deal with data quanta of
* type [[Record]], though. Because of Java's type erasure, we need to leave it up to you whether this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import java.util.{Collection => JavaCollection}
import org.apache.commons.lang3.Validate
import org.apache.wayang.api.util.DataQuantaBuilderCache
import org.apache.wayang.basic.data.Record
import org.apache.wayang.basic.operators.{TableSource, TextFileSource}
import org.apache.wayang.basic.operators.{TableSource, TextFileSource, KafkaTopicSource}
import org.apache.wayang.commons.util.profiledb.model.Experiment
import org.apache.wayang.core.api.WayangContext
import org.apache.wayang.core.plan.wayangplan._
Expand Down Expand Up @@ -63,6 +63,15 @@ class JavaPlanBuilder(wayangCtx: WayangContext, jobName: String) {
createSourceBuilder(new TextFileSource(url))(ClassTag(classOf[String]))

/**
* Read a textmessages from a Kafka topic and provide it as a dataset of [[String]]s, one per message.
*
* @param topicName the topic's name
* @return [[DataQuantaBuilder]] for the content in the topic
*/
def readKafkaTopic(topicName: String): UnarySourceDataQuantaBuilder[UnarySourceDataQuantaBuilder[_, String], String] =
createSourceBuilder(new KafkaTopicSource(topicName))(ClassTag(classOf[String]))

/**
* Read a remote text file and provide it as a dataset of [[String]]s, one per line.
*
* @param url the URL of the text file
Expand All @@ -71,8 +80,6 @@ class JavaPlanBuilder(wayangCtx: WayangContext, jobName: String) {
def readRemoteTextFile(url: String): UnarySourceDataQuantaBuilder[UnarySourceDataQuantaBuilder[_, String], String] =
createSourceBuilder(new TextFileSource(url))(ClassTag(classOf[String]))



/**
* Reads a database table and provides them as a dataset of [[Record]]s.
*
Expand Down
5 changes: 5 additions & 0 deletions wayang-commons/wayang-basic/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.5.1</version> <!-- Use the latest version available -->
</dependency>
</dependencies>

</project>
Loading

0 comments on commit 7895720

Please sign in to comment.