Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Nwang/add custom operator refactor #3049

Closed

Conversation

nwangtw
Copy link
Contributor

@nwangtw nwangtw commented Oct 17, 2018

This is the implementation of CustomOperator based on this design doc:
https://docs.google.com/document/d/1XzF0IlfuaaW8Gx3cPx1xLtP-kgCFK0TRNS5aAzuMuMg/edit#

Todos in future PRs:

  • add grouper
  • add/refactor windowed custom operator
  • scala support
  • refactor/simplify existing operators (v2 design in doc)

Existing logic doesn't change.

@huijunwu huijunwu requested review from nlu90 and kramasamy October 18, 2018 00:01
Copy link
Member

@nlu90 nlu90 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I provided some feedback based on my limited knowledge of the streamlet part.

@srkukarni @jerrypeng Could you guys also take a look if you have some time?

@@ -22,6 +22,8 @@
import java.util.List;

import org.apache.heron.classification.InterfaceStability;
import org.apache.heron.streamlet.impl.operators.IStreamletBasicOperator;
import org.apache.heron.streamlet.impl.operators.IStreamletOperator;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interface class should be placed in directory streamlet instead of streamlet/impl.

Only concrete implementations should be put into the impl directory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense. Will do.

* @param <T> The return type of the transform
* @return Streamlet containing the output of the operation
*/
<T> Streamlet<T> perform(IStreamletOperator<R, T> operator);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perform is not very intuitive for the meaning of this method's actual action. Let's come up with another name. How about applyOperator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yao and I thought about a few options and perform is the best we got.

apply() indeed is better, however it is a special function in Java/Scala so it cant be used. applyOperator() is valid, but maybe the "Operator" part sounds a bit redundant?

I am totally open for a better name. Let's see other thoughts.

* @param op The user defined CustomeBasicOperator
*/
public CustomBasicStreamlet(StreamletImpl<R> parent,
IStreamletBasicOperator<R, T> op) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for custom operator, grouping strategy should also be provided

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. Grouping is definitely needed.

It will do in the next PR. After the grouping refactor PR is merged. #3040

@@ -22,6 +22,8 @@
import java.util.List;

import org.apache.heron.classification.InterfaceStability;
import org.apache.heron.streamlet.impl.operators.IStreamletBasicOperator;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given all the existing StreamletOperator extends BaseRichBolt, should we also enforce custom operator to be BaseRichBolt?

Thus we can reduce this redundant IStreamletBasicOperator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yesh. I think we should enforce all StreamletOperator to be IRichBolt. CustomOperator will be the base class for all user defined operators (non-windowed onces) hence they will be IRichBolt.

IStreamletBasicOperator is not for CustomOperator. It is for reusing existing user Bolts (the ones extend from IBasicBolt instead of IRichBolt) only. That is why we have only IStreamletBasicOperator but no CustomBasicOperator.

import org.apache.heron.api.bolt.IRichBolt;

/**
* The interface for custom operators: including new user defined operators as well as
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment here is inconsistent with the actual code.

This interface is actually for all the operators not only for the custom operators.

Need to update this comment or the code to keep them consistent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Will do.

@@ -22,6 +22,8 @@
import java.util.List;

import org.apache.heron.classification.InterfaceStability;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we may also need to add CustomSource to allow users to provide specific spout as the input?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. It is in another PR already. #3032 (may need refactor though)

/**
* CustomOperator is the base class for all user defined operators.
* Usage:
* 1. Create user defined operator
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part needs further discussion. IMO, users should only need to provide the Bolt to the streamlet API instead of having direct access to streamlet's operator implementation and being required to implement an additional process method.

Once the desired bolt is provided, streamlet should take care of the remaining stuff.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not an addition method users need to implement.

This class is for all "new" user defined streamlet operators and there is no Bolt concept in it.

Existing Bolts (or write operators in low level way) is supported but it another different use case.


class UserBoltOperator
    extends UserBolt
    implements ICustomOperator<InputType, OutputType> {
}

Then the bolt is converted to a streamlet operator and can be used in Streamlet API.

source
   ....
  .perform(new UserBoltOperator())
   ....

@nwangtw nwangtw closed this Oct 24, 2018
@nwangtw nwangtw deleted the nwang/add_custom_operator_refactor branch October 24, 2018 20:19
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants