Skip to content

Commit

Permalink
source-sqlserver: Discover CDC instance role_name
Browse files Browse the repository at this point in the history
This is useful information for debugging.
  • Loading branch information
willdonnelly committed Feb 14, 2025
1 parent 65cb31d commit c8f542a
Showing 1 changed file with 9 additions and 2 deletions.
11 changes: 9 additions & 2 deletions source-sqlserver/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -1030,6 +1030,7 @@ func cdcGetMaxLSN(ctx context.Context, conn *sql.DB) (LSN, error) {
type captureInstanceInfo struct {
Name string
TableSchema, TableName string
RoleName string
StartLSN LSN
CreateDate time.Time
}
Expand All @@ -1039,7 +1040,7 @@ type captureInstanceInfo struct {
func cdcListCaptureInstances(ctx context.Context, conn *sql.DB) (map[sqlcapture.StreamID][]*captureInstanceInfo, error) {
// This query will enumerate all "capture instances" currently present, along with the
// schema/table names identifying the source table.
const query = `SELECT ct.capture_instance, sch.name, tbl.name, ct.start_lsn, ct.create_date
const query = `SELECT ct.capture_instance, sch.name, tbl.name, ct.start_lsn, ct.create_date, ct.role_name
FROM cdc.change_tables AS ct
JOIN sys.tables AS tbl ON ct.source_object_id = tbl.object_id
JOIN sys.schemas AS sch ON tbl.schema_id = sch.schema_id;`
Expand All @@ -1053,9 +1054,15 @@ func cdcListCaptureInstances(ctx context.Context, conn *sql.DB) (map[sqlcapture.
var captureInstances = make(map[sqlcapture.StreamID][]*captureInstanceInfo)
for rows.Next() {
var info captureInstanceInfo
if err := rows.Scan(&info.Name, &info.TableSchema, &info.TableName, &info.StartLSN, &info.CreateDate); err != nil {
var roleName *string
if err := rows.Scan(&info.Name, &info.TableSchema, &info.TableName, &info.StartLSN, &info.CreateDate, &roleName); err != nil {
return nil, fmt.Errorf("error scanning result row: %w", err)
}
if roleName != nil {
info.RoleName = *roleName
} else {
info.RoleName = ""
}

// In SQL Server, every source table may have up to two "capture instances" associated with it.
// If a capture instance exists which satisfies the configured naming pattern, then that's one
Expand Down

0 comments on commit c8f542a

Please sign in to comment.