Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
anton-vinogradov committed Jul 24, 2024
1 parent 5c3cf86 commit 83c12df
Show file tree
Hide file tree
Showing 50 changed files with 925 additions and 728 deletions.
26 changes: 24 additions & 2 deletions modules/core/src/main/java/org/apache/ignite/cdc/CdcEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.ignite.cache.CacheEntryVersion;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.internal.cdc.CdcMain;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.lang.IgniteExperimental;
import org.apache.ignite.spi.systemview.view.CacheView;
import org.jetbrains.annotations.Nullable;
Expand All @@ -39,12 +41,32 @@ public interface CdcEvent extends Serializable {
/**
* @return Key for the changed entry.
*/
public Object key();
public KeyCacheObject key();

/**
* @return Value for the changed entry or {@code null} in case of entry removal.
*/
@Nullable public Object value();
@Nullable public CacheObject value();

/**
* @return Previous entry state metadata if expected.
*/
@Nullable public CacheObject previousStateMetadata();

/**
* @return Key which was placed into cache. Or null if failed to convert.
*/
public Object unwrappedKey();

/**
* @return Value which was placed into cache. Or null for delete operation or for failure.
*/
public Object unwrappedValue();

/**
* @return Previous entry state metadata.
*/
public Object unwrappedPreviousStateMetadata();

/**
* @return {@code True} if event fired on primary node for partition containing this entry.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@
import org.apache.ignite.cache.CacheEntryVersion;
import org.apache.ignite.cdc.CdcConsumer;
import org.apache.ignite.cdc.CdcEvent;
import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
import org.apache.ignite.internal.pagemem.wal.record.UnwrappedDataEntry;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;

/**
* Event of single entry change.
Expand All @@ -33,87 +38,69 @@ public class CdcEventImpl implements CdcEvent {
/** Serial version uid. */
private static final long serialVersionUID = 0L;

/** Key. */
private final Object key;
/** Entry. */
private final DataEntry entry;

/** Value. */
private final Object val;

/** {@code True} if changes made on primary node. */
private final boolean primary;

/** Partition. */
private final int part;
/**
* @param entry Entry.
*/
public CdcEventImpl(DataEntry entry) {
this.entry = entry;
}

/** Order of the entry change. */
private final CacheEntryVersion ord;
/** {@inheritDoc} */
@Override public Object unwrappedKey() {
return ((UnwrappedDataEntry)(entry)).unwrappedKey();
}

/** Cache id. */
private final int cacheId;
/** {@inheritDoc} */
@Override public Object unwrappedValue() {
return ((UnwrappedDataEntry)(entry)).unwrappedValue();
}

/** Expire time. */
private final long expireTime;
/** {@inheritDoc} */
@Override public Object unwrappedPreviousStateMetadata() {
return ((UnwrappedDataEntry)(entry)).unwrappedPreviousStateMetadata();
}

/**
* @param key Key.
* @param val Value.
* @param primary {@code True} if changes made on primary node.
* @param part Partition.
* @param ord Order of the entry change.
* @param cacheId Cache id.
* @param expireTime Expire time.
*/
public CdcEventImpl(
Object key,
Object val,
boolean primary,
int part,
CacheEntryVersion ord,
int cacheId,
long expireTime
) {
this.key = key;
this.val = val;
this.primary = primary;
this.part = part;
this.ord = ord;
this.cacheId = cacheId;
this.expireTime = expireTime;
/** {@inheritDoc} */
@Override public KeyCacheObject key() {
return entry.key();
}

/** {@inheritDoc} */
@Override public Object key() {
return key;
@Override public CacheObject value() {
return entry.value();
}

/** {@inheritDoc} */
@Override public Object value() {
return val;
@Override public @Nullable CacheObject previousStateMetadata() {
return entry.previousStateMetadata();
}

/** {@inheritDoc} */
@Override public boolean primary() {
return primary;
return (entry.flags() & DataEntry.PRIMARY_FLAG) != 0;
}

/** {@inheritDoc} */
@Override public int partition() {
return part;
return entry.partitionId();
}

/** {@inheritDoc} */
@Override public CacheEntryVersion version() {
return ord;
return entry.writeVersion();
}

/** {@inheritDoc} */
@Override public int cacheId() {
return cacheId;
return entry.cacheId();
}

/** {@inheritDoc} */
@Override public long expireTime() {
return expireTime;
return entry.expireTime();
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ private void consumeSegmentActively(IgniteWalIteratorFactory.IteratorParametersB
boolean interrupted;

do {
boolean commit = consumer.onRecords(iter, WalRecordsConsumer.CDC_EVENT_TRANSFORMER, null);
boolean commit = consumer.onRecords(iter, null);

if (commit)
saveStateAndRemoveProcessed(iter.state());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.jetbrains.annotations.Nullable;

Expand Down Expand Up @@ -89,21 +88,6 @@ public class WalRecordsConsumer<K, V> {
return OPERATIONS_TYPES.contains(e.op());
};

/** Event transformer. */
static final IgniteClosure<DataEntry, CdcEvent> CDC_EVENT_TRANSFORMER = e -> {
UnwrapDataEntry ue = (UnwrapDataEntry)e;

return new CdcEventImpl(
ue.unwrappedKey(),
ue.unwrappedValue(),
(e.flags() & DataEntry.PRIMARY_FLAG) != 0,
e.partitionId(),
e.writeVersion(),
e.cacheId(),
e.expireTime()
);
};

/**
* @param consumer User provided CDC consumer.
* @param log Logger.
Expand All @@ -119,15 +103,10 @@ public WalRecordsConsumer(CdcConsumer consumer, IgniteLogger log) {
* {@link DataRecord} will be stored and WAL iteration will be started from it on CDC application fail/restart.
*
* @param entries Data entries iterator.
* @param transform Event transformer.
* @param filter Optional event filter.
* @return {@code True} if current offset in WAL should be commited.
*/
public boolean onRecords(
Iterator<DataEntry> entries,
IgniteClosure<DataEntry, CdcEvent> transform,
@Nullable IgnitePredicate<? super DataEntry> filter
) {
public boolean onRecords(Iterator<DataEntry> entries, @Nullable IgnitePredicate<? super DataEntry> filter) {
Iterator<CdcEvent> evts = F.iterator(new Iterator<DataEntry>() {
@Override public boolean hasNext() {
return entries.hasNext();
Expand All @@ -142,7 +121,7 @@ public boolean onRecords(

return next;
}
}, transform, true, OPERATIONS_FILTER, filter);
}, CdcEventImpl::new, true, OPERATIONS_FILTER, filter);

return consumer.onEvents(evts);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ private void resendCacheData(IgniteInternalCache<?, ?> cache) throws IgniteCheck
row.expireTime(),
key.partition(),
-1,
null, // Conflict resolve should not happen at data copying.
DataEntry.flags(true))
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ public class DataEntry {
/** */
public static final byte FROM_STORE_FLAG = 0b00000100;

/** */
public static final byte PREV_STATE_FLAG = 0b00001000;

/** Cache ID. */
@GridToStringInclude
protected int cacheId;
Expand Down Expand Up @@ -74,6 +77,9 @@ public class DataEntry {
@GridToStringInclude
protected long partCnt;

/** Previous entry state metadata. */
protected CacheObject prevStateMeta;

/**
* Bit flags.
* <ul>
Expand All @@ -100,6 +106,7 @@ private DataEntry() {
* @param expireTime Expire time.
* @param partId Partition ID.
* @param partCnt Partition counter.
* @param prevStateMeta Previous entry state metadata.
* @param flags Entry flags.
*/
public DataEntry(
Expand All @@ -112,6 +119,7 @@ public DataEntry(
long expireTime,
int partId,
long partCnt,
CacheObject prevStateMeta,
byte flags
) {
this.cacheId = cacheId;
Expand All @@ -123,8 +131,12 @@ public DataEntry(
this.expireTime = expireTime;
this.partId = partId;
this.partCnt = partCnt;
this.prevStateMeta = prevStateMeta;
this.flags = flags;

if (this.prevStateMeta != null)
this.flags |= PREV_STATE_FLAG;

// Only READ, CREATE, UPDATE and DELETE operations should be stored in WAL.
assert op == GridCacheOperation.READ
|| op == GridCacheOperation.CREATE
Expand Down Expand Up @@ -231,6 +243,13 @@ public long expireTime() {
return expireTime;
}

/**
* Previous entry state metadata.
*/
public CacheObject previousStateMetadata() {
return prevStateMeta;
}

/**
* Entry flags.
* @see #flags
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ public class LazyDataEntry extends DataEntry {
/** Value value bytes. */
private byte[] valBytes;

/** Previous entry state metadata bytes type code. See {@link CacheObject} for built-in value type codes */
private byte prevStateMetaType;

/** Previous entry state metadata bytes. */
private byte[] prevStateMetaBytes;

/**
* @param cctx Shared context.
* @param cacheId Cache ID.
Expand All @@ -60,6 +66,8 @@ public class LazyDataEntry extends DataEntry {
* @param expireTime Expire time.
* @param partId Partition ID.
* @param partCnt Partition counter.
* @param prevStateMetaType Object type code for previous entry state metadata.
* @param prevStateMetaBytes Previous entry state metadata bytes.
* @param flags Flags.
*/
public LazyDataEntry(
Expand All @@ -75,15 +83,19 @@ public LazyDataEntry(
long expireTime,
int partId,
long partCnt,
byte prevStateMetaType,
byte[] prevStateMetaBytes,
byte flags
) {
super(cacheId, null, null, op, nearXidVer, writeVer, expireTime, partId, partCnt, flags);
super(cacheId, null, null, op, nearXidVer, writeVer, expireTime, partId, partCnt, null, flags);

this.cctx = cctx;
this.keyType = keyType;
this.keyBytes = keyBytes;
this.valType = valType;
this.valBytes = valBytes;
this.prevStateMetaType = prevStateMetaType;
this.prevStateMetaBytes = prevStateMetaBytes;
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -126,6 +138,22 @@ public LazyDataEntry(
return val;
}

/** {@inheritDoc} */
@Override public CacheObject previousStateMetadata() {
if (prevStateMeta == null && prevStateMetaBytes != null) {
GridCacheContext cacheCtx = cctx.cacheContext(cacheId);

if (cacheCtx == null)
throw new IgniteException("Failed to find cache context for the given cache ID: " + cacheId);

IgniteCacheObjectProcessor co = cctx.kernalContext().cacheObjects();

prevStateMeta = co.toCacheObject(cacheCtx.cacheObjectContext(), prevStateMetaType, prevStateMetaBytes);
}

return prevStateMeta;
}

/** @return Data Entry Key type code. See {@link CacheObject} for built-in value type codes */
public byte getKeyType() {
return keyType;
Expand All @@ -145,4 +173,14 @@ public byte getValType() {
public byte[] getValBytes() {
return valBytes;
}

/** {@inheritDoc} */
@Override public byte getPreviousStateMetadataType() {
return prevStateMetaType;
}

/** {@inheritDoc} */
@Override public byte[] getPreviousStateMetadataBytes() {
return prevStateMetaBytes;
}
}
Empty file.
Loading

0 comments on commit 83c12df

Please sign in to comment.