Skip to content

Commit

Permalink
change
Browse files Browse the repository at this point in the history
Signed-off-by: gengjun-git <[email protected]>
  • Loading branch information
gengjun-git committed Aug 14, 2024
1 parent e833dcc commit 1829033
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.starrocks.common.ErrorCode;
import com.starrocks.common.ErrorReport;
import com.starrocks.common.Pair;
import com.starrocks.persist.ImageWriter;
import com.starrocks.persist.gson.GsonUtils;
import com.starrocks.persist.metablock.SRMetaBlockException;
import com.starrocks.persist.metablock.SRMetaBlockID;
Expand All @@ -36,7 +37,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -309,12 +309,12 @@ public Optional<Pipe> mayGetPipe(PipeName name) {
}
}

public void save(DataOutputStream dos) throws IOException, SRMetaBlockException {
public void save(ImageWriter imageWriter) throws IOException, SRMetaBlockException {
try {
lock.readLock().lock();
final int cnt = 1 + pipeMap.size();
SRMetaBlockWriter writer = new SRMetaBlockWriter(dos, SRMetaBlockID.PIPE_MGR, cnt);
writer.writeJson(pipeMap.size());
SRMetaBlockWriter writer = imageWriter.getBlockWriter(SRMetaBlockID.PIPE_MGR, cnt);
writer.writeInt(pipeMap.size());
for (Pipe pipe : pipeMap.values()) {
writer.writeJson(pipe);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package com.starrocks.load.pipe;

import com.starrocks.persist.ImageWriter;
import com.starrocks.persist.PipeOpEntry;
import com.starrocks.persist.metablock.SRMetaBlockEOFException;
import com.starrocks.persist.metablock.SRMetaBlockException;
Expand All @@ -22,7 +23,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataOutputStream;
import java.io.IOException;

/**
Expand Down Expand Up @@ -68,8 +68,8 @@ public void load(SRMetaBlockReader reader) throws IOException, SRMetaBlockExcept
LOG.info("loaded {} pipes", cnt);
}

public void save(DataOutputStream dos) throws IOException, SRMetaBlockException {
pipeManager.save(dos);
public void save(ImageWriter imageWriter) throws IOException, SRMetaBlockException {
pipeManager.save(imageWriter);
}

public void replay(PipeOpEntry entry) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@
import org.apache.logging.log4j.Logger;

import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.logging.log4j.Logger;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
Expand Down Expand Up @@ -1690,6 +1689,7 @@ public void saveImage(ImageWriter imageWriter, File curFile) throws IOException
dictionaryMgr.save(imageWriter);
replicationMgr.save(imageWriter);
keyMgr.save(imageWriter);
pipeManager.getRepo().save(imageWriter);
} catch (SRMetaBlockException e) {
LOG.error("Save meta block failed ", e);
throw new IOException("Save meta block failed ", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.starrocks.persist.AlterUserInfo;
import com.starrocks.persist.CreateUserInfo;
import com.starrocks.persist.OperationType;
import com.starrocks.persist.metablock.SRMetaBlockReader;
import com.starrocks.persist.metablock.SRMetaBlockReaderV2;
import com.starrocks.privilege.AuthorizationMgr;
import com.starrocks.qe.ConnectContext;
import com.starrocks.qe.DDLStmtExecutor;
Expand Down Expand Up @@ -262,7 +264,7 @@ public void testCreateUserPersistWithProperties() throws Exception {
// 1. create empty image
UtFrameUtils.PseudoJournalReplayer.resetFollowerJournalQueue();
UtFrameUtils.PseudoImage emptyImage = new UtFrameUtils.PseudoImage();
masterManager.saveV2(emptyImage.getDataOutputStream());
masterManager.saveV2(emptyImage.getImageWriter());

// 2. create user with properties
String sql = "create user user123 properties (\"session.tx_visible_wait_timeout\" = \"100\", " +
Expand All @@ -285,13 +287,13 @@ public void testCreateUserPersistWithProperties() throws Exception {

// 3. save final image
UtFrameUtils.PseudoImage finalImage = new UtFrameUtils.PseudoImage();
masterManager.saveV2(finalImage.getDataOutputStream());
masterManager.saveV2(finalImage.getImageWriter());

// 4 verify replay...

// 4.1 load empty image
AuthenticationMgr followerManager = new AuthenticationMgr();
SRMetaBlockReader srMetaBlockReader = new SRMetaBlockReader(emptyImage.getDataInputStream());
SRMetaBlockReader srMetaBlockReader = new SRMetaBlockReaderV2(emptyImage.getJsonReader());
followerManager.loadV2(srMetaBlockReader);

// 4.2 replay update user property
Expand All @@ -311,7 +313,7 @@ public void testCreateUserPersistWithProperties() throws Exception {

// 4.3 verify final image
AuthenticationMgr finalManager = new AuthenticationMgr();
srMetaBlockReader = new SRMetaBlockReader(finalImage.getDataInputStream());
srMetaBlockReader = new SRMetaBlockReaderV2(finalImage.getJsonReader());
finalManager.loadV2(srMetaBlockReader);
userProperty = finalManager.getUserProperty(user);
Assert.assertEquals(2, userProperty.getSessionVariables().size());
Expand Down Expand Up @@ -382,7 +384,7 @@ public void testAlterPersistWithProperties() throws Exception {
// 1. create empty image
UtFrameUtils.PseudoJournalReplayer.resetFollowerJournalQueue();
UtFrameUtils.PseudoImage emptyImage = new UtFrameUtils.PseudoImage();
masterManager.saveV2(emptyImage.getDataOutputStream());
masterManager.saveV2(emptyImage.getImageWriter());

// create two catalogs
String catalogName = "catalog";
Expand Down Expand Up @@ -419,13 +421,13 @@ public void testAlterPersistWithProperties() throws Exception {

// 4. save final image
UtFrameUtils.PseudoImage finalImage = new UtFrameUtils.PseudoImage();
masterManager.saveV2(finalImage.getDataOutputStream());
masterManager.saveV2(finalImage.getImageWriter());

// 5 verify replay...

// 5.1 load empty image
AuthenticationMgr followerManager = new AuthenticationMgr();
SRMetaBlockReader srMetaBlockReader = new SRMetaBlockReader(emptyImage.getDataInputStream());
SRMetaBlockReader srMetaBlockReader = new SRMetaBlockReaderV2(emptyImage.getJsonReader());
followerManager.loadV2(srMetaBlockReader);

// 5.2 replay create user
Expand Down Expand Up @@ -456,7 +458,7 @@ public void testAlterPersistWithProperties() throws Exception {

// 4.3 verify final image
AuthenticationMgr finalManager = new AuthenticationMgr();
srMetaBlockReader = new SRMetaBlockReader(finalImage.getDataInputStream());
srMetaBlockReader = new SRMetaBlockReaderV2(finalImage.getJsonReader());
finalManager.loadV2(srMetaBlockReader);
userProperty = finalManager.getUserProperty("user1");
Assert.assertEquals(1, userProperty.getSessionVariables().size());
Expand Down Expand Up @@ -747,7 +749,7 @@ public void testSetUserPropertyPersist() throws Exception {
// 1. create empty image
UtFrameUtils.PseudoJournalReplayer.resetFollowerJournalQueue();
UtFrameUtils.PseudoImage emptyImage = new UtFrameUtils.PseudoImage();
masterManager.saveV2(emptyImage.getDataOutputStream());
masterManager.saveV2(emptyImage.getImageWriter());

// 2. update user property
String sql = "set property for 'root' 'max_user_connections' = '555'";
Expand All @@ -757,13 +759,13 @@ public void testSetUserPropertyPersist() throws Exception {

// 3. save final image
UtFrameUtils.PseudoImage finalImage = new UtFrameUtils.PseudoImage();
masterManager.saveV2(finalImage.getDataOutputStream());
masterManager.saveV2(finalImage.getImageWriter());

// 4 verify replay...

// 4.1 load empty image
AuthenticationMgr followerManager = new AuthenticationMgr();
SRMetaBlockReader srMetaBlockReader = new SRMetaBlockReader(emptyImage.getDataInputStream());
SRMetaBlockReader srMetaBlockReader = new SRMetaBlockReaderV2(emptyImage.getJsonReader());
followerManager.loadV2(srMetaBlockReader);

// 4.2 replay update user property
Expand All @@ -774,7 +776,7 @@ public void testSetUserPropertyPersist() throws Exception {

// 4.3 verify final image
AuthenticationMgr finalManager = new AuthenticationMgr();
srMetaBlockReader = new SRMetaBlockReader(finalImage.getDataInputStream());
srMetaBlockReader = new SRMetaBlockReaderV2(finalImage.getJsonReader());
finalManager.loadV2(srMetaBlockReader);
Assert.assertTrue(finalManager.doesUserExist(UserIdentity.ROOT));
Assert.assertEquals(555, finalManager.getMaxConn("root"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.starrocks.persist.OperationType;
import com.starrocks.persist.PipeOpEntry;
import com.starrocks.persist.metablock.SRMetaBlockReader;
import com.starrocks.persist.metablock.SRMetaBlockReaderV2;
import com.starrocks.qe.ConnectContext;
import com.starrocks.qe.SessionVariable;
import com.starrocks.qe.ShowExecutor;
Expand Down Expand Up @@ -237,11 +238,11 @@ public void persistPipe() throws Exception {
CreatePipeStmt createStmt = (CreatePipeStmt) UtFrameUtils.parseStmtWithNewParser(sql, ctx);
pm.createPipe(createStmt);
UtFrameUtils.PseudoImage image1 = new UtFrameUtils.PseudoImage();
pm.getRepo().save(image1.getDataOutputStream());
pm.getRepo().save(image1.getImageWriter());

// restore from image
PipeManager pm1 = new PipeManager();
SRMetaBlockReader reader = new SRMetaBlockReader(image1.getDataInputStream());
SRMetaBlockReader reader = new SRMetaBlockReaderV2(image1.getJsonReader());
pm1.getRepo().load(reader);
reader.close();
Assert.assertEquals(pm.getPipesUnlock(), pm1.getPipesUnlock());
Expand All @@ -255,11 +256,11 @@ public void persistPipe() throws Exception {
AlterPipeStmt alterPipeStmt = (AlterPipeStmt) UtFrameUtils.parseStmtWithNewParser(sql, ctx);
pm.alterPipe(alterPipeStmt);
UtFrameUtils.PseudoImage image2 = new UtFrameUtils.PseudoImage();
pm.getRepo().save(image2.getDataOutputStream());
pm.getRepo().save(image2.getImageWriter());

// restore and check
PipeManager pm2 = new PipeManager();
reader = new SRMetaBlockReader(image2.getDataInputStream());
reader = new SRMetaBlockReaderV2(image2.getJsonReader());
pm2.getRepo().load(reader);
reader.close();
Assert.assertEquals(pm.getPipesUnlock(), pm2.getPipesUnlock());
Expand Down Expand Up @@ -1037,7 +1038,7 @@ public void testRecovery() throws Exception {
"create pipe p_crash as insert into tbl select * from files('path'='fake://pipe', 'format'='parquet')";
createPipe(sql);
UtFrameUtils.PseudoImage image1 = new UtFrameUtils.PseudoImage();
pm.getRepo().save(image1.getDataOutputStream());
pm.getRepo().save(image1.getImageWriter());

// loading file and crash
String name = "p_crash";
Expand All @@ -1051,7 +1052,7 @@ public void testRecovery() throws Exception {
{
PipeManager pm1 = new PipeManager();
FileListRepo repo = pipe.getPipeSource().getFileListRepo();
SRMetaBlockReader reader = new SRMetaBlockReader(image1.getDataInputStream());
SRMetaBlockReader reader = new SRMetaBlockReaderV2(image1.getJsonReader());
pm1.getRepo().load(reader);
reader.close();
Assert.assertEquals(pm.getPipesUnlock(), pm1.getPipesUnlock());
Expand All @@ -1078,7 +1079,7 @@ public TransactionStatus getLabelStatus(long dbId, String label) {
};

PipeManager pm1 = new PipeManager();
SRMetaBlockReader reader = new SRMetaBlockReader(image1.getDataInputStream());
SRMetaBlockReader reader = new SRMetaBlockReaderV2(image1.getJsonReader());
pm1.getRepo().load(reader);
reader.close();
Assert.assertEquals(pm.getPipesUnlock(), pm1.getPipesUnlock());
Expand Down

0 comments on commit 1829033

Please sign in to comment.