From f1b5c019f1147d4b9761c914499ac3f1a23797e2 Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Wed, 9 Feb 2022 22:28:37 +0000 Subject: [PATCH] chore: enable t.Parallel() wherever possible - notably _not_ in the functional tests for now as they currently need to run in isolation Ref: https://pkg.go.dev/testing#T.Parallel --- .golangci.yml | 6 ++ Makefile | 2 +- acl_create_request_test.go | 2 + acl_create_response_test.go | 1 + acl_delete_request_test.go | 2 + acl_delete_response_test.go | 1 + acl_describe_request_test.go | 2 + acl_describe_response_test.go | 1 + acl_types_test.go | 4 ++ add_offsets_to_txn_request_test.go | 1 + add_offsets_to_txn_response_test.go | 1 + add_partitions_to_txn_request_test.go | 1 + add_partitions_to_txn_response_test.go | 1 + admin_test.go | 40 +++++++++++++ alter_client_quotas_request_test.go | 1 + alter_client_quotas_response_test.go | 1 + alter_configs_request_test.go | 1 + alter_configs_response_test.go | 1 + alter_partition_reassignments_request_test.go | 1 + ...r_partition_reassignments_response_test.go | 1 + alter_user_scram_credentials_request_test.go | 1 + alter_user_scram_credentials_response_test.go | 1 + api_versions_request_test.go | 2 + api_versions_response_test.go | 2 + async_producer_test.go | 30 +++++++++- balance_strategy_test.go | 59 +++++++++++++++++++ broker_test.go | 23 ++++++++ client_test.go | 58 ++++++++++++++---- client_tls_test.go | 8 ++- config_test.go | 10 ++++ consumer_group_members_test.go | 3 + consumer_metadata_request_test.go | 1 + consumer_metadata_response_test.go | 2 + consumer_test.go | 39 +++++++++++- control_record_test.go | 1 + create_partitions_request_test.go | 1 + create_partitions_response_test.go | 2 + create_topics_request_test.go | 1 + create_topics_response_test.go | 2 + delete_groups_request_test.go | 1 + delete_groups_response_test.go | 1 + delete_offsets_request_test.go | 1 + delete_offsets_response_test.go | 1 + delete_records_request_test.go | 1 + delete_records_response_test.go | 1 + delete_topics_request_test.go | 2 + delete_topics_response_test.go | 1 + describe_client_quotas_request_test.go | 1 + describe_client_quotas_response_test.go | 1 + describe_configs_request_test.go | 2 + describe_configs_response_test.go | 5 ++ describe_groups_request_test.go | 1 + describe_groups_response_test.go | 1 + describe_log_dirs_request_test.go | 1 + describe_log_dirs_response_test.go | 1 + ...ibe_user_scram_credentials_request_test.go | 1 + ...be_user_scram_credentials_response_test.go | 1 + end_txn_request_test.go | 1 + end_txn_response_test.go | 1 + examples/http_server/http_server_test.go | 3 + fetch_request_test.go | 6 ++ fetch_response_test.go | 8 +++ find_coordinator_request_test.go | 1 + find_coordinator_response_test.go | 1 + heartbeat_request_test.go | 1 + heartbeat_response_test.go | 1 + incremental_alter_configs_request_test.go | 1 + incremental_alter_configs_response_test.go | 1 + init_producer_id_request_test.go | 1 + init_producer_id_response_test.go | 1 + join_group_request_test.go | 4 ++ join_group_response_test.go | 3 + kerberos_client_test.go | 4 ++ leave_group_request_test.go | 1 + leave_group_response_test.go | 1 + list_groups_request_test.go | 1 + list_groups_response_test.go | 1 + list_partition_reassignments_request_test.go | 1 + list_partition_reassignments_response_test.go | 1 + message_test.go | 8 +++ metadata_request_test.go | 6 ++ metadata_response_test.go | 7 +++ metrics_test.go | 2 + mocks/async_producer_test.go | 6 ++ mocks/consumer_test.go | 13 ++++ mocks/sync_producer_test.go | 10 ++++ offset_commit_request_test.go | 3 + offset_commit_response_test.go | 3 + offset_fetch_request_test.go | 3 + offset_fetch_response_test.go | 2 + offset_manager_test.go | 15 +++++ offset_request_test.go | 4 ++ offset_response_test.go | 3 + partitioner_test.go | 9 +++ produce_request_test.go | 1 + produce_response_test.go | 3 + produce_set_test.go | 10 ++++ record_test.go | 2 + records_test.go | 2 + response_header_test.go | 2 + sasl_authenticate_request_test.go | 1 + sasl_authenticate_response_test.go | 1 + sasl_handshake_request_test.go | 1 + sasl_handshake_response_test.go | 1 + scram_formatter_test.go | 2 + sticky_assignor_user_data_test.go | 2 + sync_group_request_test.go | 1 + sync_group_response_test.go | 1 + sync_producer_test.go | 5 ++ txn_offset_commit_request_test.go | 1 + txn_offset_commit_response_test.go | 1 + utils_test.go | 2 + 112 files changed, 504 insertions(+), 14 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index f92a7167c..c1e71ac4f 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -59,6 +59,7 @@ linters: - misspell # - nakedret - nilerr + - paralleltest # - scopelint - staticcheck - structcheck @@ -72,5 +73,10 @@ linters: issues: exclude: - "G404: Use of weak random number generator" + exclude-rules: + # exclude some linters from running on certains files. + - path: functional.*_test\.go + linters: + - paralleltest # maximum count of issues with the same text. set to 0 for unlimited. default is 3. max-same-issues: 0 diff --git a/Makefile b/Makefile index 8f8fc6bdb..9394b18d2 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ default: fmt get update test lint GO := go GOBUILD := CGO_ENABLED=0 $(GO) build $(BUILD_FLAG) -GOTEST := $(GO) test -gcflags='-l' -p 3 -v -race -timeout 10m -coverprofile=profile.out -covermode=atomic +GOTEST := $(GO) test -gcflags='-l' -race -timeout 10m -coverprofile=profile.out -covermode=atomic FILES := $(shell find . -name '*.go' -type f -not -name '*.pb.go' -not -name '*_generated.go' -not -name '*_test.go') TESTS := $(shell find . -name '*.go' -type f -not -name '*.pb.go' -not -name '*_generated.go' -name '*_test.go') diff --git a/acl_create_request_test.go b/acl_create_request_test.go index 1c636d01f..896b36d6c 100644 --- a/acl_create_request_test.go +++ b/acl_create_request_test.go @@ -25,6 +25,7 @@ var ( ) func TestCreateAclsRequestv0(t *testing.T) { + t.Parallel() req := &CreateAclsRequest{ Version: 0, AclCreations: []*AclCreation{ @@ -47,6 +48,7 @@ func TestCreateAclsRequestv0(t *testing.T) { } func TestCreateAclsRequestv1(t *testing.T) { + t.Parallel() req := &CreateAclsRequest{ Version: 1, AclCreations: []*AclCreation{ diff --git a/acl_create_response_test.go b/acl_create_response_test.go index 65b934d9a..020c8b378 100644 --- a/acl_create_response_test.go +++ b/acl_create_response_test.go @@ -24,6 +24,7 @@ var ( ) func TestCreateAclsResponse(t *testing.T) { + t.Parallel() errmsg := "error" resp := &CreateAclsResponse{ ThrottleTime: 100 * time.Millisecond, diff --git a/acl_delete_request_test.go b/acl_delete_request_test.go index cb126e30f..6b456f5c8 100644 --- a/acl_delete_request_test.go +++ b/acl_delete_request_test.go @@ -63,6 +63,7 @@ var ( ) func TestDeleteAclsRequest(t *testing.T) { + t.Parallel() req := &DeleteAclsRequest{ Filters: []*AclFilter{{ ResourceType: AclResourceAny, @@ -91,6 +92,7 @@ func TestDeleteAclsRequest(t *testing.T) { } func TestDeleteAclsRequestV1(t *testing.T) { + t.Parallel() req := &DeleteAclsRequest{ Version: 1, Filters: []*AclFilter{{ diff --git a/acl_delete_response_test.go b/acl_delete_response_test.go index eb57d68a5..e8ba57753 100644 --- a/acl_delete_response_test.go +++ b/acl_delete_response_test.go @@ -22,6 +22,7 @@ var deleteAclsResponse = []byte{ } func TestDeleteAclsResponse(t *testing.T) { + t.Parallel() resp := &DeleteAclsResponse{ ThrottleTime: 100 * time.Millisecond, FilterResponses: []*FilterResponse{{ diff --git a/acl_describe_request_test.go b/acl_describe_request_test.go index 3cb73eb79..6a090c330 100644 --- a/acl_describe_request_test.go +++ b/acl_describe_request_test.go @@ -25,6 +25,7 @@ var ( ) func TestAclDescribeRequestV0(t *testing.T) { + t.Parallel() resourcename := "topic" principal := "principal" host := "host" @@ -44,6 +45,7 @@ func TestAclDescribeRequestV0(t *testing.T) { } func TestAclDescribeRequestV1(t *testing.T) { + t.Parallel() resourcename := "topic" principal := "principal" host := "host" diff --git a/acl_describe_response_test.go b/acl_describe_response_test.go index f0652cfee..3147c4bc8 100644 --- a/acl_describe_response_test.go +++ b/acl_describe_response_test.go @@ -20,6 +20,7 @@ var aclDescribeResponseError = []byte{ } func TestAclDescribeResponse(t *testing.T) { + t.Parallel() errmsg := "error" resp := &DescribeAclsResponse{ ThrottleTime: 100 * time.Millisecond, diff --git a/acl_types_test.go b/acl_types_test.go index 0b5247a14..65dc3a84c 100644 --- a/acl_types_test.go +++ b/acl_types_test.go @@ -5,6 +5,7 @@ import ( ) func TestAclOperationTextMarshal(t *testing.T) { + t.Parallel() for i := AclOperationUnknown; i <= AclOperationIdempotentWrite; i++ { text, err := i.MarshalText() if err != nil { @@ -22,6 +23,7 @@ func TestAclOperationTextMarshal(t *testing.T) { } func TestAclPermissionTypeTextMarshal(t *testing.T) { + t.Parallel() for i := AclPermissionUnknown; i <= AclPermissionAllow; i++ { text, err := i.MarshalText() if err != nil { @@ -39,6 +41,7 @@ func TestAclPermissionTypeTextMarshal(t *testing.T) { } func TestAclResourceTypeTextMarshal(t *testing.T) { + t.Parallel() for i := AclResourceUnknown; i <= AclResourceTransactionalID; i++ { text, err := i.MarshalText() if err != nil { @@ -56,6 +59,7 @@ func TestAclResourceTypeTextMarshal(t *testing.T) { } func TestAclResourcePatternTypeTextMarshal(t *testing.T) { + t.Parallel() for i := AclPatternUnknown; i <= AclPatternPrefixed; i++ { text, err := i.MarshalText() if err != nil { diff --git a/add_offsets_to_txn_request_test.go b/add_offsets_to_txn_request_test.go index 471d085cd..c3bb3cc12 100644 --- a/add_offsets_to_txn_request_test.go +++ b/add_offsets_to_txn_request_test.go @@ -10,6 +10,7 @@ var addOffsetsToTxnRequest = []byte{ } func TestAddOffsetsToTxnRequest(t *testing.T) { + t.Parallel() req := &AddOffsetsToTxnRequest{ TransactionalID: "txn", ProducerID: 8000, diff --git a/add_offsets_to_txn_response_test.go b/add_offsets_to_txn_response_test.go index d1730cee4..1c2d47b37 100644 --- a/add_offsets_to_txn_response_test.go +++ b/add_offsets_to_txn_response_test.go @@ -11,6 +11,7 @@ var addOffsetsToTxnResponse = []byte{ } func TestAddOffsetsToTxnResponse(t *testing.T) { + t.Parallel() resp := &AddOffsetsToTxnResponse{ ThrottleTime: 100 * time.Millisecond, Err: ErrInvalidProducerEpoch, diff --git a/add_partitions_to_txn_request_test.go b/add_partitions_to_txn_request_test.go index f60a88695..59ae0a5d5 100644 --- a/add_partitions_to_txn_request_test.go +++ b/add_partitions_to_txn_request_test.go @@ -12,6 +12,7 @@ var addPartitionsToTxnRequest = []byte{ } func TestAddPartitionsToTxnRequest(t *testing.T) { + t.Parallel() req := &AddPartitionsToTxnRequest{ TransactionalID: "txn", ProducerID: 8000, diff --git a/add_partitions_to_txn_response_test.go b/add_partitions_to_txn_response_test.go index b3635e58d..ed4902011 100644 --- a/add_partitions_to_txn_response_test.go +++ b/add_partitions_to_txn_response_test.go @@ -15,6 +15,7 @@ var addPartitionsToTxnResponse = []byte{ } func TestAddPartitionsToTxnResponse(t *testing.T) { + t.Parallel() resp := &AddPartitionsToTxnResponse{ ThrottleTime: 100 * time.Millisecond, Errors: map[string][]*PartitionError{ diff --git a/admin_test.go b/admin_test.go index 57b59c374..9441142d7 100644 --- a/admin_test.go +++ b/admin_test.go @@ -7,6 +7,7 @@ import ( ) func TestClusterAdmin(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -30,6 +31,7 @@ func TestClusterAdmin(t *testing.T) { } func TestClusterAdminInvalidController(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -51,6 +53,7 @@ func TestClusterAdminInvalidController(t *testing.T) { } func TestClusterAdminCreateTopic(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -79,6 +82,7 @@ func TestClusterAdminCreateTopic(t *testing.T) { } func TestClusterAdminCreateTopicWithInvalidTopicDetail(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -107,6 +111,7 @@ func TestClusterAdminCreateTopicWithInvalidTopicDetail(t *testing.T) { } func TestClusterAdminCreateTopicWithoutAuthorization(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -137,6 +142,7 @@ func TestClusterAdminCreateTopicWithoutAuthorization(t *testing.T) { } func TestClusterAdminListTopics(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -188,6 +194,7 @@ func TestClusterAdminListTopics(t *testing.T) { } func TestClusterAdminDeleteTopic(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -217,6 +224,7 @@ func TestClusterAdminDeleteTopic(t *testing.T) { } func TestClusterAdminDeleteEmptyTopic(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -246,6 +254,7 @@ func TestClusterAdminDeleteEmptyTopic(t *testing.T) { } func TestClusterAdminCreatePartitions(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -275,6 +284,7 @@ func TestClusterAdminCreatePartitions(t *testing.T) { } func TestClusterAdminCreatePartitionsWithDiffVersion(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -304,6 +314,7 @@ func TestClusterAdminCreatePartitionsWithDiffVersion(t *testing.T) { } func TestClusterAdminCreatePartitionsWithoutAuthorization(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -333,6 +344,7 @@ func TestClusterAdminCreatePartitionsWithoutAuthorization(t *testing.T) { } func TestClusterAdminAlterPartitionReassignments(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -373,6 +385,7 @@ func TestClusterAdminAlterPartitionReassignments(t *testing.T) { } func TestClusterAdminAlterPartitionReassignmentsWithDiffVersion(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -412,6 +425,7 @@ func TestClusterAdminAlterPartitionReassignmentsWithDiffVersion(t *testing.T) { } func TestClusterAdminListPartitionReassignments(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -459,6 +473,7 @@ func TestClusterAdminListPartitionReassignments(t *testing.T) { } func TestClusterAdminListPartitionReassignmentsWithDiffVersion(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -498,6 +513,7 @@ func TestClusterAdminListPartitionReassignmentsWithDiffVersion(t *testing.T) { } func TestClusterAdminDeleteRecords(t *testing.T) { + t.Parallel() topicName := "my_topic" seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -543,6 +559,7 @@ func TestClusterAdminDeleteRecords(t *testing.T) { } func TestClusterAdminDeleteRecordsWithInCorrectBroker(t *testing.T) { + t.Parallel() topicName := "my_topic" seedBroker := NewMockBroker(t, 1) secondBroker := NewMockBroker(t, 2) @@ -594,6 +611,7 @@ func TestClusterAdminDeleteRecordsWithInCorrectBroker(t *testing.T) { } func TestClusterAdminDeleteRecordsWithDiffVersion(t *testing.T) { + t.Parallel() topicName := "my_topic" seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -643,6 +661,7 @@ func TestClusterAdminDeleteRecordsWithDiffVersion(t *testing.T) { } func TestClusterAdminDescribeConfig(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -709,6 +728,7 @@ func TestClusterAdminDescribeConfig(t *testing.T) { } func TestClusterAdminDescribeConfigWithErrorCode(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -744,6 +764,7 @@ func TestClusterAdminDescribeConfigWithErrorCode(t *testing.T) { // TestClusterAdminDescribeBrokerConfig ensures that a describe broker config // is sent to the broker in the resource struct, _not_ the controller func TestClusterAdminDescribeBrokerConfig(t *testing.T) { + t.Parallel() controllerBroker := NewMockBroker(t, 1) defer controllerBroker.Close() configBroker := NewMockBroker(t, 2) @@ -794,6 +815,7 @@ func TestClusterAdminDescribeBrokerConfig(t *testing.T) { } func TestClusterAdminAlterConfig(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -827,6 +849,7 @@ func TestClusterAdminAlterConfig(t *testing.T) { } func TestClusterAdminAlterConfigWithErrorCode(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -858,6 +881,7 @@ func TestClusterAdminAlterConfigWithErrorCode(t *testing.T) { } func TestClusterAdminAlterBrokerConfig(t *testing.T) { + t.Parallel() controllerBroker := NewMockBroker(t, 1) defer controllerBroker.Close() configBroker := NewMockBroker(t, 2) @@ -912,6 +936,7 @@ func TestClusterAdminAlterBrokerConfig(t *testing.T) { } func TestClusterAdminIncrementalAlterConfig(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -953,6 +978,7 @@ func TestClusterAdminIncrementalAlterConfig(t *testing.T) { } func TestClusterAdminIncrementalAlterConfigWithErrorCode(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -992,6 +1018,7 @@ func TestClusterAdminIncrementalAlterConfigWithErrorCode(t *testing.T) { } func TestClusterAdminIncrementalAlterBrokerConfig(t *testing.T) { + t.Parallel() controllerBroker := NewMockBroker(t, 1) defer controllerBroker.Close() configBroker := NewMockBroker(t, 2) @@ -1054,6 +1081,7 @@ func TestClusterAdminIncrementalAlterBrokerConfig(t *testing.T) { } func TestClusterAdminCreateAcl(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -1086,6 +1114,7 @@ func TestClusterAdminCreateAcl(t *testing.T) { } func TestClusterAdminListAcls(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -1133,6 +1162,7 @@ func TestClusterAdminListAcls(t *testing.T) { } func TestClusterAdminDeleteAcl(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -1169,6 +1199,7 @@ func TestClusterAdminDeleteAcl(t *testing.T) { } func TestDescribeTopic(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -1207,6 +1238,7 @@ func TestDescribeTopic(t *testing.T) { } func TestDescribeTopicWithVersion0_11(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -1245,6 +1277,7 @@ func TestDescribeTopicWithVersion0_11(t *testing.T) { } func TestDescribeConsumerGroup(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -1288,6 +1321,7 @@ func TestDescribeConsumerGroup(t *testing.T) { } func TestListConsumerGroups(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -1333,6 +1367,7 @@ func TestListConsumerGroups(t *testing.T) { } func TestListConsumerGroupsMultiBroker(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -1397,6 +1432,7 @@ func TestListConsumerGroupsMultiBroker(t *testing.T) { } func TestListConsumerGroupOffsets(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -1444,6 +1480,7 @@ func TestListConsumerGroupOffsets(t *testing.T) { } func TestDeleteConsumerGroup(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -1473,6 +1510,7 @@ func TestDeleteConsumerGroup(t *testing.T) { } func TestDeleteOffset(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -1525,6 +1563,7 @@ func TestDeleteOffset(t *testing.T) { // TestRefreshMetaDataWithDifferentController ensures that the cached // controller can be forcibly updated from Metadata by the admin client func TestRefreshMetaDataWithDifferentController(t *testing.T) { + t.Parallel() seedBroker1 := NewMockBroker(t, 1) seedBroker2 := NewMockBroker(t, 2) defer seedBroker1.Close() @@ -1571,6 +1610,7 @@ func TestRefreshMetaDataWithDifferentController(t *testing.T) { } func TestDescribeLogDirs(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() diff --git a/alter_client_quotas_request_test.go b/alter_client_quotas_request_test.go index 46f92b9cb..404f69f92 100644 --- a/alter_client_quotas_request_test.go +++ b/alter_client_quotas_request_test.go @@ -63,6 +63,7 @@ var ( ) func TestAlterClientQuotasRequest(t *testing.T) { + t.Parallel() // default user defaultUserComponent := QuotaEntityComponent{ EntityType: QuotaEntityUser, diff --git a/alter_client_quotas_response_test.go b/alter_client_quotas_response_test.go index 40efeb547..a767cf984 100644 --- a/alter_client_quotas_response_test.go +++ b/alter_client_quotas_response_test.go @@ -40,6 +40,7 @@ var ( ) func TestAlterClientQuotasResponse(t *testing.T) { + t.Parallel() // default user defaultUserComponent := QuotaEntityComponent{ EntityType: QuotaEntityUser, diff --git a/alter_configs_request_test.go b/alter_configs_request_test.go index e612244c6..97e2679bb 100644 --- a/alter_configs_request_test.go +++ b/alter_configs_request_test.go @@ -41,6 +41,7 @@ var ( ) func TestAlterConfigsRequest(t *testing.T) { + t.Parallel() var request *AlterConfigsRequest request = &AlterConfigsRequest{ diff --git a/alter_configs_response_test.go b/alter_configs_response_test.go index 62cd5991f..ca8afbe9e 100644 --- a/alter_configs_response_test.go +++ b/alter_configs_response_test.go @@ -21,6 +21,7 @@ var ( ) func TestAlterConfigsResponse(t *testing.T) { + t.Parallel() var response *AlterConfigsResponse response = &AlterConfigsResponse{ diff --git a/alter_partition_reassignments_request_test.go b/alter_partition_reassignments_request_test.go index c917f2d79..e60d61096 100644 --- a/alter_partition_reassignments_request_test.go +++ b/alter_partition_reassignments_request_test.go @@ -33,6 +33,7 @@ var ( ) func TestAlterPartitionReassignmentRequest(t *testing.T) { + t.Parallel() var request *AlterPartitionReassignmentsRequest request = &AlterPartitionReassignmentsRequest{ diff --git a/alter_partition_reassignments_response_test.go b/alter_partition_reassignments_response_test.go index 8c39ef6ce..4a513c1f3 100644 --- a/alter_partition_reassignments_response_test.go +++ b/alter_partition_reassignments_response_test.go @@ -26,6 +26,7 @@ var ( ) func TestAlterPartitionReassignmentResponse(t *testing.T) { + t.Parallel() var response *AlterPartitionReassignmentsResponse = &AlterPartitionReassignmentsResponse{ ThrottleTimeMs: int32(10000), Version: int16(0), diff --git a/alter_user_scram_credentials_request_test.go b/alter_user_scram_credentials_request_test.go index 6fe881906..8ba7f9f20 100644 --- a/alter_user_scram_credentials_request_test.go +++ b/alter_user_scram_credentials_request_test.go @@ -31,6 +31,7 @@ var ( ) func TestAlterUserScramCredentialsRequest(t *testing.T) { + t.Parallel() request := &AlterUserScramCredentialsRequest{ Version: 0, Deletions: []AlterUserScramCredentialsDelete{}, diff --git a/alter_user_scram_credentials_response_test.go b/alter_user_scram_credentials_response_test.go index 983500639..31607051f 100644 --- a/alter_user_scram_credentials_response_test.go +++ b/alter_user_scram_credentials_response_test.go @@ -23,6 +23,7 @@ var ( ) func TestAlterUserScramCredentialsResponse(t *testing.T) { + t.Parallel() response := &AlterUserScramCredentialsResponse{ Version: 0, ThrottleTime: time.Second * 3, diff --git a/api_versions_request_test.go b/api_versions_request_test.go index 371ac9602..15b84f346 100644 --- a/api_versions_request_test.go +++ b/api_versions_request_test.go @@ -13,11 +13,13 @@ var ( ) func TestApiVersionsRequest(t *testing.T) { + t.Parallel() request := new(ApiVersionsRequest) testRequest(t, "basic", request, apiVersionRequest) } func TestApiVersionsRequestV3(t *testing.T) { + t.Parallel() request := new(ApiVersionsRequest) request.Version = 3 request.ClientSoftwareName = "sarama" diff --git a/api_versions_response_test.go b/api_versions_response_test.go index 26e0bf2a4..6574c8317 100644 --- a/api_versions_response_test.go +++ b/api_versions_response_test.go @@ -24,6 +24,7 @@ var ( ) func TestApiVersionsResponse(t *testing.T) { + t.Parallel() response := new(ApiVersionsResponse) testVersionDecodable(t, "no error", response, apiVersionResponse, 0) if response.ErrorCode != int16(ErrNoError) { @@ -41,6 +42,7 @@ func TestApiVersionsResponse(t *testing.T) { } func TestApiVersionsResponseV3(t *testing.T) { + t.Parallel() response := new(ApiVersionsResponse) response.Version = 3 testVersionDecodable(t, "no error", response, apiVersionResponseV3, 3) diff --git a/async_producer_test.go b/async_producer_test.go index e571aa068..d79149ed5 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -99,6 +99,7 @@ func (f flakyEncoder) Encode() ([]byte, error) { } func TestAsyncProducer(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) @@ -148,6 +149,7 @@ done: } func TestAsyncProducerMultipleFlushes(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) @@ -183,6 +185,7 @@ func TestAsyncProducerMultipleFlushes(t *testing.T) { } func TestAsyncProducerMultipleBrokers(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) leader0 := NewMockBroker(t, 2) leader1 := NewMockBroker(t, 3) @@ -223,6 +226,7 @@ func TestAsyncProducerMultipleBrokers(t *testing.T) { } func TestAsyncProducerCustomPartitioner(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) @@ -265,6 +269,7 @@ func TestAsyncProducerCustomPartitioner(t *testing.T) { } func TestAsyncProducerFailureRetry(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) leader1 := NewMockBroker(t, 2) leader2 := NewMockBroker(t, 3) @@ -313,6 +318,7 @@ func TestAsyncProducerFailureRetry(t *testing.T) { } func TestAsyncProducerRecoveryWithRetriesDisabled(t *testing.T) { + t.Parallel() tt := func(t *testing.T, kErr KError) { seedBroker := NewMockBroker(t, 0) broker1 := NewMockBroker(t, 1) @@ -390,15 +396,18 @@ func TestAsyncProducerRecoveryWithRetriesDisabled(t *testing.T) { } t.Run("retriable error", func(t *testing.T) { + t.Parallel() tt(t, ErrNotLeaderForPartition) }) t.Run("non-retriable error", func(t *testing.T) { + t.Parallel() tt(t, ErrNotController) }) } func TestAsyncProducerEncoderFailures(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) @@ -437,6 +446,7 @@ func TestAsyncProducerEncoderFailures(t *testing.T) { // If a Kafka broker becomes unavailable and then returns back in service, then // producer reconnects to it and continues sending messages. func TestAsyncProducerBrokerBounce(t *testing.T) { + t.Parallel() // Given seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) @@ -478,6 +488,7 @@ func TestAsyncProducerBrokerBounce(t *testing.T) { } func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) leader1 := NewMockBroker(t, 2) leader2 := NewMockBroker(t, 3) @@ -521,6 +532,7 @@ func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) { } func TestAsyncProducerMultipleRetries(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) leader1 := NewMockBroker(t, 2) leader2 := NewMockBroker(t, 3) @@ -577,6 +589,7 @@ func TestAsyncProducerMultipleRetries(t *testing.T) { } func TestAsyncProducerMultipleRetriesWithBackoffFunc(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) leader1 := NewMockBroker(t, 2) leader2 := NewMockBroker(t, 3) @@ -644,6 +657,7 @@ func TestAsyncProducerMultipleRetriesWithBackoffFunc(t *testing.T) { } func TestAsyncProducerOutOfRetries(t *testing.T) { + t.Parallel() t.Skip("Enable once bug #294 is fixed.") seedBroker := NewMockBroker(t, 1) @@ -701,6 +715,7 @@ func TestAsyncProducerOutOfRetries(t *testing.T) { } func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) leaderAddr := leader.Addr() @@ -758,6 +773,7 @@ func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) { } func TestAsyncProducerFlusherRetryCondition(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) @@ -824,6 +840,7 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) { } func TestAsyncProducerRetryShutdown(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) @@ -873,6 +890,7 @@ func TestAsyncProducerRetryShutdown(t *testing.T) { } func TestAsyncProducerNoReturns(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) @@ -913,6 +931,7 @@ func TestAsyncProducerNoReturns(t *testing.T) { } func TestAsyncProducerIdempotentGoldenPath(t *testing.T) { + t.Parallel() broker := NewMockBroker(t, 1) metadataResponse := &MetadataResponse{ @@ -961,6 +980,7 @@ func TestAsyncProducerIdempotentGoldenPath(t *testing.T) { } func TestAsyncProducerIdempotentRetryCheckBatch(t *testing.T) { + t.Parallel() // Logger = log.New(os.Stderr, "", log.LstdFlags) tests := []struct { name string @@ -1108,6 +1128,7 @@ func TestAsyncProducerIdempotentRetryCheckBatch(t *testing.T) { } func TestAsyncProducerIdempotentErrorOnOutOfSeq(t *testing.T) { + t.Parallel() broker := NewMockBroker(t, 1) metadataResponse := &MetadataResponse{ @@ -1157,6 +1178,7 @@ func TestAsyncProducerIdempotentErrorOnOutOfSeq(t *testing.T) { } func TestAsyncProducerIdempotentEpochRollover(t *testing.T) { + t.Parallel() broker := NewMockBroker(t, 1) defer broker.Close() @@ -1227,6 +1249,7 @@ func TestAsyncProducerIdempotentEpochRollover(t *testing.T) { // TestBrokerProducerShutdown ensures that a call to shutdown stops the // brokerProducer run() loop and doesn't leak any goroutines +//nolint:paralleltest func TestBrokerProducerShutdown(t *testing.T) { defer leaktest.Check(t)() metrics.UseNilMetrics = true // disable Sarama's go-metrics library @@ -1321,6 +1344,7 @@ func testProducerInterceptor( } func TestAsyncProducerInterceptors(t *testing.T) { + t.Parallel() tests := []struct { name string interceptors []ProducerInterceptor @@ -1372,7 +1396,11 @@ func TestAsyncProducerInterceptors(t *testing.T) { }, } for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { testProducerInterceptor(t, tt.interceptors, tt.expectationFn) }) + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + testProducerInterceptor(t, tt.interceptors, tt.expectationFn) + }) } } diff --git a/balance_strategy_test.go b/balance_strategy_test.go index adae80caa..4da317cfc 100644 --- a/balance_strategy_test.go +++ b/balance_strategy_test.go @@ -12,6 +12,7 @@ import ( ) func TestBalanceStrategyRange(t *testing.T) { + t.Parallel() tests := []struct { members map[string][]string topics map[string][]int32 @@ -64,6 +65,7 @@ func TestBalanceStrategyRange(t *testing.T) { } func TestBalanceStrategyRangeAssignmentData(t *testing.T) { + t.Parallel() strategy := BalanceStrategyRange members := make(map[string]ConsumerGroupMemberMetadata, 2) @@ -84,6 +86,7 @@ func TestBalanceStrategyRangeAssignmentData(t *testing.T) { } func TestBalanceStrategyRoundRobin(t *testing.T) { + t.Parallel() tests := []struct { members map[string][]string topics map[string][]int32 @@ -166,6 +169,7 @@ func TestBalanceStrategyRoundRobin(t *testing.T) { } func Test_deserializeTopicPartitionAssignment(t *testing.T) { + t.Parallel() type args struct { userDataBytes []byte } @@ -237,7 +241,9 @@ func Test_deserializeTopicPartitionAssignment(t *testing.T) { }, } for _, tt := range tests { + tt := tt t.Run(tt.name, func(t *testing.T) { + t.Parallel() got, err := deserializeTopicPartitionAssignment(tt.args.userDataBytes) if (err != nil) != tt.wantErr { t.Errorf("deserializeTopicPartitionAssignment() error = %v, wantErr %v", err, tt.wantErr) @@ -251,6 +257,7 @@ func Test_deserializeTopicPartitionAssignment(t *testing.T) { } func TestBalanceStrategyRoundRobinAssignmentData(t *testing.T) { + t.Parallel() strategy := BalanceStrategyRoundRobin members := make(map[string]ConsumerGroupMemberMetadata, 2) @@ -271,6 +278,7 @@ func TestBalanceStrategyRoundRobinAssignmentData(t *testing.T) { } func Test_prepopulateCurrentAssignments(t *testing.T) { + t.Parallel() type args struct { members map[string]ConsumerGroupMemberMetadata } @@ -408,7 +416,9 @@ func Test_prepopulateCurrentAssignments(t *testing.T) { }, } for _, tt := range tests { + tt := tt t.Run(tt.name, func(t *testing.T) { + t.Parallel() _, gotPrevAssignments, err := prepopulateCurrentAssignments(tt.args.members) if (err != nil) != tt.wantErr { @@ -423,6 +433,7 @@ func Test_prepopulateCurrentAssignments(t *testing.T) { } func Test_areSubscriptionsIdentical(t *testing.T) { + t.Parallel() type args struct { partition2AllPotentialConsumers map[topicPartitionAssignment][]string consumer2AllPotentialPartitions map[string][]topicPartitionAssignment @@ -538,7 +549,9 @@ func Test_areSubscriptionsIdentical(t *testing.T) { }, } for _, tt := range tests { + tt := tt t.Run(tt.name, func(t *testing.T) { + t.Parallel() if got := areSubscriptionsIdentical(tt.args.partition2AllPotentialConsumers, tt.args.consumer2AllPotentialPartitions); got != tt.want { t.Errorf("areSubscriptionsIdentical() = %v, want %v", got, tt.want) } @@ -547,6 +560,7 @@ func Test_areSubscriptionsIdentical(t *testing.T) { } func Test_sortMemberIDsByPartitionAssignments(t *testing.T) { + t.Parallel() type args struct { assignments map[string][]topicPartitionAssignment } @@ -594,7 +608,9 @@ func Test_sortMemberIDsByPartitionAssignments(t *testing.T) { }, } for _, tt := range tests { + tt := tt t.Run(tt.name, func(t *testing.T) { + t.Parallel() if got := sortMemberIDsByPartitionAssignments(tt.args.assignments); !reflect.DeepEqual(got, tt.want) { t.Errorf("sortMemberIDsByPartitionAssignments() = %v, want %v", got, tt.want) } @@ -603,6 +619,7 @@ func Test_sortMemberIDsByPartitionAssignments(t *testing.T) { } func Test_sortPartitions(t *testing.T) { + t.Parallel() type args struct { currentAssignment map[string][]topicPartitionAssignment partitionsWithADifferentPreviousAssignment map[topicPartitionAssignment]consumerGenerationPair @@ -704,7 +721,9 @@ func Test_sortPartitions(t *testing.T) { }, } for _, tt := range tests { + tt := tt t.Run(tt.name, func(t *testing.T) { + t.Parallel() got := sortPartitions(tt.args.currentAssignment, tt.args.partitionsWithADifferentPreviousAssignment, tt.args.isFreshAssignment, tt.args.partition2AllPotentialConsumers, tt.args.consumer2AllPotentialPartitions) if tt.want != nil && !reflect.DeepEqual(got, tt.want) { t.Errorf("sortPartitions() = %v, want %v", got, tt.want) @@ -714,6 +733,7 @@ func Test_sortPartitions(t *testing.T) { } func Test_filterAssignedPartitions(t *testing.T) { + t.Parallel() type args struct { currentAssignment map[string][]topicPartitionAssignment partition2AllPotentialConsumers map[topicPartitionAssignment][]string @@ -777,7 +797,9 @@ func Test_filterAssignedPartitions(t *testing.T) { }, } for _, tt := range tests { + tt := tt t.Run(tt.name, func(t *testing.T) { + t.Parallel() if got := filterAssignedPartitions(tt.args.currentAssignment, tt.args.partition2AllPotentialConsumers); !reflect.DeepEqual(got, tt.want) { t.Errorf("filterAssignedPartitions() = %v, want %v", got, tt.want) } @@ -786,6 +808,7 @@ func Test_filterAssignedPartitions(t *testing.T) { } func Test_canConsumerParticipateInReassignment(t *testing.T) { + t.Parallel() type args struct { memberID string currentAssignment map[string][]topicPartitionAssignment @@ -878,7 +901,9 @@ func Test_canConsumerParticipateInReassignment(t *testing.T) { }, } for _, tt := range tests { + tt := tt t.Run(tt.name, func(t *testing.T) { + t.Parallel() if got := canConsumerParticipateInReassignment(tt.args.memberID, tt.args.currentAssignment, tt.args.consumer2AllPotentialPartitions, tt.args.partition2AllPotentialConsumers); got != tt.want { t.Errorf("canConsumerParticipateInReassignment() = %v, want %v", got, tt.want) } @@ -887,6 +912,7 @@ func Test_canConsumerParticipateInReassignment(t *testing.T) { } func Test_removeTopicPartitionFromMemberAssignments(t *testing.T) { + t.Parallel() type args struct { assignments []topicPartitionAssignment topic topicPartitionAssignment @@ -951,7 +977,9 @@ func Test_removeTopicPartitionFromMemberAssignments(t *testing.T) { }, } for _, tt := range tests { + tt := tt t.Run(tt.name, func(t *testing.T) { + t.Parallel() if got := removeTopicPartitionFromMemberAssignments(tt.args.assignments, tt.args.topic); !reflect.DeepEqual(got, tt.want) { t.Errorf("removeTopicPartitionFromMemberAssignments() = %v, want %v", got, tt.want) } @@ -960,6 +988,7 @@ func Test_removeTopicPartitionFromMemberAssignments(t *testing.T) { } func Test_assignPartition(t *testing.T) { + t.Parallel() type args struct { partition topicPartitionAssignment sortedCurrentSubscriptions []string @@ -1069,7 +1098,9 @@ func Test_assignPartition(t *testing.T) { }, } for _, tt := range tests { + tt := tt t.Run(tt.name, func(t *testing.T) { + t.Parallel() if got := assignPartition(tt.args.partition, tt.args.sortedCurrentSubscriptions, tt.args.currentAssignment, tt.args.consumer2AllPotentialPartitions, tt.args.currentPartitionConsumer); !reflect.DeepEqual(got, tt.want) { t.Errorf("assignPartition() = %v, want %v", got, tt.want) } @@ -1084,6 +1115,7 @@ func Test_assignPartition(t *testing.T) { } func Test_stickyBalanceStrategy_Plan(t *testing.T) { + t.Parallel() type args struct { members map[string]ConsumerGroupMemberMetadata topics map[string][]int32 @@ -1303,7 +1335,9 @@ func Test_stickyBalanceStrategy_Plan(t *testing.T) { }, } for _, tt := range tests { + tt := tt t.Run(tt.name, func(t *testing.T) { + t.Parallel() s := &stickyBalanceStrategy{} plan, err := s.Plan(tt.args.members, tt.args.topics) verifyPlanIsBalancedAndSticky(t, s, tt.args.members, plan, err) @@ -1313,6 +1347,7 @@ func Test_stickyBalanceStrategy_Plan(t *testing.T) { } func Test_stickyBalanceStrategy_Plan_KIP54_ExampleOne(t *testing.T) { + t.Parallel() s := &stickyBalanceStrategy{} // PLAN 1 @@ -1353,6 +1388,7 @@ func Test_stickyBalanceStrategy_Plan_KIP54_ExampleOne(t *testing.T) { } func Test_stickyBalanceStrategy_Plan_KIP54_ExampleTwo(t *testing.T) { + t.Parallel() s := &stickyBalanceStrategy{} // PLAN 1 @@ -1397,6 +1433,7 @@ func Test_stickyBalanceStrategy_Plan_KIP54_ExampleTwo(t *testing.T) { } func Test_stickyBalanceStrategy_Plan_KIP54_ExampleThree(t *testing.T) { + t.Parallel() s := &stickyBalanceStrategy{} topicNames := []string{"topic1", "topic2"} @@ -1434,6 +1471,7 @@ func Test_stickyBalanceStrategy_Plan_KIP54_ExampleThree(t *testing.T) { } func Test_stickyBalanceStrategy_Plan_AddRemoveConsumerOneTopic(t *testing.T) { + t.Parallel() s := &stickyBalanceStrategy{} // PLAN 1 @@ -1470,6 +1508,7 @@ func Test_stickyBalanceStrategy_Plan_AddRemoveConsumerOneTopic(t *testing.T) { } func Test_stickyBalanceStrategy_Plan_PoorRoundRobinAssignmentScenario(t *testing.T) { + t.Parallel() s := &stickyBalanceStrategy{} // PLAN 1 @@ -1501,6 +1540,7 @@ func Test_stickyBalanceStrategy_Plan_PoorRoundRobinAssignmentScenario(t *testing } func Test_stickyBalanceStrategy_Plan_AddRemoveTopicTwoConsumers(t *testing.T) { + t.Parallel() s := &stickyBalanceStrategy{} // PLAN 1 @@ -1551,6 +1591,7 @@ func Test_stickyBalanceStrategy_Plan_AddRemoveTopicTwoConsumers(t *testing.T) { } func Test_stickyBalanceStrategy_Plan_ReassignmentAfterOneConsumerLeaves(t *testing.T) { + t.Parallel() s := &stickyBalanceStrategy{} // PLAN 1 @@ -1591,6 +1632,7 @@ func Test_stickyBalanceStrategy_Plan_ReassignmentAfterOneConsumerLeaves(t *testi } func Test_stickyBalanceStrategy_Plan_ReassignmentAfterOneConsumerAdded(t *testing.T) { + t.Parallel() s := &stickyBalanceStrategy{} // PLAN 1 @@ -1615,6 +1657,7 @@ func Test_stickyBalanceStrategy_Plan_ReassignmentAfterOneConsumerAdded(t *testin } func Test_stickyBalanceStrategy_Plan_SameSubscriptions(t *testing.T) { + t.Parallel() s := &stickyBalanceStrategy{} // PLAN 1 @@ -1652,6 +1695,7 @@ func Test_stickyBalanceStrategy_Plan_SameSubscriptions(t *testing.T) { } func Test_stickyBalanceStrategy_Plan_LargeAssignmentWithMultipleConsumersLeaving(t *testing.T) { + t.Parallel() s := &stickyBalanceStrategy{} r := rand.New(rand.NewSource(time.Now().UnixNano())) @@ -1692,6 +1736,7 @@ func Test_stickyBalanceStrategy_Plan_LargeAssignmentWithMultipleConsumersLeaving } func Test_stickyBalanceStrategy_Plan_NewSubscription(t *testing.T) { + t.Parallel() s := &stickyBalanceStrategy{} members := make(map[string]ConsumerGroupMemberMetadata, 20) @@ -1721,6 +1766,7 @@ func Test_stickyBalanceStrategy_Plan_NewSubscription(t *testing.T) { } func Test_stickyBalanceStrategy_Plan_ReassignmentWithRandomSubscriptionsAndChanges(t *testing.T) { + t.Parallel() r := rand.New(rand.NewSource(time.Now().UnixNano())) minNumConsumers := 20 @@ -1770,6 +1816,7 @@ func Test_stickyBalanceStrategy_Plan_ReassignmentWithRandomSubscriptionsAndChang } func Test_stickyBalanceStrategy_Plan_MoveExistingAssignments(t *testing.T) { + t.Parallel() s := &stickyBalanceStrategy{} topics := make(map[string][]int32, 6) @@ -1795,6 +1842,7 @@ func Test_stickyBalanceStrategy_Plan_MoveExistingAssignments(t *testing.T) { } func Test_stickyBalanceStrategy_Plan_Stickiness(t *testing.T) { + t.Parallel() s := &stickyBalanceStrategy{} topics := map[string][]int32{"topic1": {0, 1, 2}} @@ -1823,6 +1871,7 @@ func Test_stickyBalanceStrategy_Plan_Stickiness(t *testing.T) { } func Test_stickyBalanceStrategy_Plan_AssignmentUpdatedForDeletedTopic(t *testing.T) { + t.Parallel() s := &stickyBalanceStrategy{} topics := make(map[string][]int32, 2) @@ -1845,6 +1894,7 @@ func Test_stickyBalanceStrategy_Plan_AssignmentUpdatedForDeletedTopic(t *testing } func Test_stickyBalanceStrategy_Plan_NoExceptionRaisedWhenOnlySubscribedTopicDeleted(t *testing.T) { + t.Parallel() s := &stickyBalanceStrategy{} topics := map[string][]int32{"topic1": {0, 1, 2}} @@ -1873,6 +1923,7 @@ func Test_stickyBalanceStrategy_Plan_NoExceptionRaisedWhenOnlySubscribedTopicDel } func Test_stickyBalanceStrategy_Plan_AssignmentWithMultipleGenerations1(t *testing.T) { + t.Parallel() s := &stickyBalanceStrategy{} topics := map[string][]int32{"topic1": {0, 1, 2, 3, 4, 5}} @@ -1923,6 +1974,7 @@ func Test_stickyBalanceStrategy_Plan_AssignmentWithMultipleGenerations1(t *testi } func Test_stickyBalanceStrategy_Plan_AssignmentWithMultipleGenerations2(t *testing.T) { + t.Parallel() s := &stickyBalanceStrategy{} topics := map[string][]int32{"topic1": {0, 1, 2, 3, 4, 5}} @@ -1969,6 +2021,7 @@ func Test_stickyBalanceStrategy_Plan_AssignmentWithMultipleGenerations2(t *testi } func Test_stickyBalanceStrategy_Plan_AssignmentWithConflictingPreviousGenerations(t *testing.T) { + t.Parallel() s := &stickyBalanceStrategy{} topics := map[string][]int32{"topic1": {0, 1, 2, 3, 4, 5}} @@ -1992,6 +2045,7 @@ func Test_stickyBalanceStrategy_Plan_AssignmentWithConflictingPreviousGeneration } func Test_stickyBalanceStrategy_Plan_SchemaBackwardCompatibility(t *testing.T) { + t.Parallel() s := &stickyBalanceStrategy{} topics := map[string][]int32{"topic1": {0, 1, 2}} @@ -2012,6 +2066,7 @@ func Test_stickyBalanceStrategy_Plan_SchemaBackwardCompatibility(t *testing.T) { } func Test_stickyBalanceStrategy_Plan_ConflictingPreviousAssignments(t *testing.T) { + t.Parallel() s := &stickyBalanceStrategy{} topics := map[string][]int32{"topic1": {0, 1}} @@ -2031,6 +2086,7 @@ func Test_stickyBalanceStrategy_Plan_ConflictingPreviousAssignments(t *testing.T } func Test_stickyBalanceStrategy_Plan_AssignmentData(t *testing.T) { + t.Parallel() s := &stickyBalanceStrategy{} members := make(map[string]ConsumerGroupMemberMetadata, 2) @@ -2320,6 +2376,7 @@ func getRandomSublist(r *rand.Rand, s []string) []string { } func Test_sortPartitionsByPotentialConsumerAssignments(t *testing.T) { + t.Parallel() type args struct { partition2AllPotentialConsumers map[topicPartitionAssignment][]string } @@ -2397,7 +2454,9 @@ func Test_sortPartitionsByPotentialConsumerAssignments(t *testing.T) { }, } for _, tt := range tests { + tt := tt t.Run(tt.name, func(t *testing.T) { + t.Parallel() if got := sortPartitionsByPotentialConsumerAssignments(tt.args.partition2AllPotentialConsumers); !reflect.DeepEqual(got, tt.want) { t.Errorf("sortPartitionsByPotentialConsumerAssignments() = %v, want %v", got, tt.want) } diff --git a/broker_test.go b/broker_test.go index a9e7da60e..909995bf9 100644 --- a/broker_test.go +++ b/broker_test.go @@ -52,6 +52,7 @@ type brokerMetrics struct { } func TestBrokerAccessors(t *testing.T) { + t.Parallel() broker := NewBroker("abc:123") if broker.ID() != -1 { @@ -111,8 +112,11 @@ func (p produceResponsePromise) Get() (*ProduceResponse, error) { } func TestSimpleBrokerCommunication(t *testing.T) { + t.Parallel() for _, tt := range brokerTestTable { + tt := tt t.Run(tt.name, func(t *testing.T) { + t.Parallel() Logger.Printf("Testing broker communication for %s", tt.name) mb := NewMockBroker(t, 0) mb.Returns(&mockEncoder{tt.response}) @@ -151,8 +155,11 @@ func TestSimpleBrokerCommunication(t *testing.T) { } func TestBrokerFailedRequest(t *testing.T) { + t.Parallel() for _, tt := range brokerFailedReqTestTable { + tt := tt t.Run(tt.name, func(t *testing.T) { + t.Parallel() t.Logf("Testing broker communication for %s", tt.name) mb := NewMockBroker(t, 0) if !tt.stopBroker { @@ -210,6 +217,7 @@ func newTokenProvider(token *AccessToken, err error) *TokenProvider { } func TestSASLOAuthBearer(t *testing.T) { + t.Parallel() testTable := []struct { name string authidentity string @@ -280,7 +288,9 @@ func TestSASLOAuthBearer(t *testing.T) { } for i, test := range testTable { + test := test t.Run(test.name, func(t *testing.T) { + t.Parallel() // mockBroker mocks underlying network logic and broker responses mockBroker := NewMockBroker(t, 0) @@ -363,6 +373,7 @@ func (m *MockSCRAMClient) Done() bool { var _ SCRAMClient = &MockSCRAMClient{} func TestSASLSCRAMSHAXXX(t *testing.T) { + t.Parallel() testTable := []struct { name string mockHandshakeErr KError @@ -402,7 +413,9 @@ func TestSASLSCRAMSHAXXX(t *testing.T) { } for i, test := range testTable { + test := test t.Run(test.name, func(t *testing.T) { + t.Parallel() // mockBroker mocks underlying network logic and broker responses mockBroker := NewMockBroker(t, 0) broker := NewBroker(mockBroker.Addr()) @@ -473,6 +486,7 @@ func TestSASLSCRAMSHAXXX(t *testing.T) { } func TestSASLPlainAuth(t *testing.T) { + t.Parallel() testTable := []struct { name string authidentity string @@ -504,7 +518,9 @@ func TestSASLPlainAuth(t *testing.T) { } for i, test := range testTable { + test := test t.Run(test.name, func(t *testing.T) { + t.Parallel() // mockBroker mocks underlying network logic and broker responses mockBroker := NewMockBroker(t, 0) @@ -601,6 +617,7 @@ func TestSASLPlainAuth(t *testing.T) { // TestSASLReadTimeout ensures that the broker connection won't block forever // if the remote end never responds after the handshake func TestSASLReadTimeout(t *testing.T) { + t.Parallel() mockBroker := NewMockBroker(t, 0) defer mockBroker.Close() @@ -649,6 +666,7 @@ func TestSASLReadTimeout(t *testing.T) { } func TestGSSAPIKerberosAuth_Authorize(t *testing.T) { + t.Parallel() testTable := []struct { name string error error @@ -697,7 +715,9 @@ func TestGSSAPIKerberosAuth_Authorize(t *testing.T) { }, } for i, test := range testTable { + test := test t.Run(test.name, func(t *testing.T) { + t.Parallel() mockBroker := NewMockBroker(t, 0) // broker executes SASL requests against mockBroker @@ -769,6 +789,7 @@ func TestGSSAPIKerberosAuth_Authorize(t *testing.T) { } func TestBuildClientFirstMessage(t *testing.T) { + t.Parallel() testTable := []struct { name string token *AccessToken @@ -805,7 +826,9 @@ func TestBuildClientFirstMessage(t *testing.T) { } for i, test := range testTable { + test := test t.Run(test.name, func(t *testing.T) { + t.Parallel() actual, err := buildClientFirstMessage(test.token) if !reflect.DeepEqual(test.expected, actual) { diff --git a/client_test.go b/client_test.go index 8cb29d5af..f5253a564 100644 --- a/client_test.go +++ b/client_test.go @@ -1,7 +1,6 @@ package sarama import ( - "fmt" "io" "sync" "sync/atomic" @@ -17,6 +16,7 @@ func safeClose(t testing.TB, c io.Closer) { } func TestSimpleClient(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) seedBroker.Returns(new(MetadataResponse)) @@ -31,6 +31,7 @@ func TestSimpleClient(t *testing.T) { } func TestCachedPartitions(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) replicas := []int32{3, 1, 5} @@ -70,6 +71,7 @@ func TestCachedPartitions(t *testing.T) { } func TestClientDoesntCachePartitionsForTopicsWithErrors(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) replicas := []int32{seedBroker.BrokerID()} @@ -124,6 +126,7 @@ func TestClientDoesntCachePartitionsForTopicsWithErrors(t *testing.T) { } func TestClientSeedBrokers(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) metadataResponse := new(MetadataResponse) @@ -140,6 +143,7 @@ func TestClientSeedBrokers(t *testing.T) { } func TestClientMetadata(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 5) @@ -215,6 +219,7 @@ func TestClientMetadata(t *testing.T) { } func TestClientMetadataWithOfflineReplicas(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 5) @@ -303,6 +308,7 @@ func TestClientMetadataWithOfflineReplicas(t *testing.T) { } func TestClientGetOffset(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) leaderAddr := leader.Addr() @@ -351,6 +357,7 @@ func TestClientGetOffset(t *testing.T) { } func TestClientReceivingUnknownTopicWithBackoffFunc(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) metadataResponse1 := new(MetadataResponse) @@ -388,6 +395,7 @@ func TestClientReceivingUnknownTopicWithBackoffFunc(t *testing.T) { } func TestClientReceivingUnknownTopic(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) metadataResponse1 := new(MetadataResponse) @@ -424,6 +432,7 @@ func TestClientReceivingUnknownTopic(t *testing.T) { } func TestClientReceivingPartialMetadata(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 5) @@ -479,6 +488,7 @@ func TestClientReceivingPartialMetadata(t *testing.T) { } func TestClientRefreshBehaviour(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 5) @@ -516,6 +526,7 @@ func TestClientRefreshBehaviour(t *testing.T) { } func TestClientRefreshBrokers(t *testing.T) { + t.Parallel() initialSeed := NewMockBroker(t, 0) leader := NewMockBroker(t, 5) @@ -547,6 +558,7 @@ func TestClientRefreshBrokers(t *testing.T) { } func TestClientRefreshMetadataBrokerOffline(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 5) @@ -577,6 +589,7 @@ func TestClientRefreshMetadataBrokerOffline(t *testing.T) { } func TestClientGetBroker(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 5) @@ -613,6 +626,7 @@ func TestClientGetBroker(t *testing.T) { } func TestClientResurrectDeadSeeds(t *testing.T) { + t.Parallel() initialSeed := NewMockBroker(t, 0) emptyMetadata := new(MetadataResponse) initialSeed.Returns(emptyMetadata) @@ -671,6 +685,7 @@ func TestClientResurrectDeadSeeds(t *testing.T) { safeClose(t, c) } +//nolint:paralleltest func TestClientController(t *testing.T) { seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -718,13 +733,33 @@ func TestClientController(t *testing.T) { } func TestClientMetadataTimeout(t *testing.T) { - for _, timeout := range []time.Duration{ - 250 * time.Millisecond, // Will cut the first retry pass - 500 * time.Millisecond, // Will cut the second retry pass - 750 * time.Millisecond, // Will cut the third retry pass - 900 * time.Millisecond, // Will stop after the three retries - } { - t.Run(fmt.Sprintf("timeout=%v", timeout), func(t *testing.T) { + t.Parallel() + tests := []struct { + name string + timeout time.Duration + }{ + { + "timeout=250ms", + 250 * time.Millisecond, // Will cut the first retry pass + }, + { + "timeout=500ms", + 500 * time.Millisecond, // Will cut the second retry pass + }, + { + "timeout=750ms", + 750 * time.Millisecond, // Will cut the third retry pass + }, + { + "timeout=900ms", + 900 * time.Millisecond, // Will stop after the three retries + }, + } + + for _, tc := range tests { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() // Use a responsive broker to create a working client initialSeed := NewMockBroker(t, 0) emptyMetadata := new(MetadataResponse) @@ -737,7 +772,7 @@ func TestClientMetadataTimeout(t *testing.T) { conf.Metadata.Retry.Backoff = 0 conf.Metadata.RefreshFrequency = 0 // But configure a "global" timeout - conf.Metadata.Timeout = timeout + conf.Metadata.Timeout = tc.timeout c, err := NewClient([]string{initialSeed.Addr()}, conf) if err != nil { t.Fatal(err) @@ -766,7 +801,7 @@ func TestClientMetadataTimeout(t *testing.T) { // Check that the refresh fails fast enough (less than twice the configured timeout) // instead of at least: 100 ms * 2 brokers * 3 retries = 800 ms - maxRefreshDuration := 2 * timeout + maxRefreshDuration := 2 * tc.timeout select { case err := <-errChan: if err == nil { @@ -785,6 +820,7 @@ func TestClientMetadataTimeout(t *testing.T) { } func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) staleCoordinator := NewMockBroker(t, 2) freshCoordinator := NewMockBroker(t, 3) @@ -864,6 +900,7 @@ func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) { } func TestClientCoordinatorWithoutConsumerOffsetsTopic(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) coordinator := NewMockBroker(t, 2) @@ -917,6 +954,7 @@ func TestClientCoordinatorWithoutConsumerOffsetsTopic(t *testing.T) { } func TestClientAutorefreshShutdownRace(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) metadataResponse := new(MetadataResponse) diff --git a/client_tls_test.go b/client_tls_test.go index 7c6e5cd31..d6d8e85be 100644 --- a/client_tls_test.go +++ b/client_tls_test.go @@ -13,6 +13,7 @@ import ( ) func TestTLS(t *testing.T) { + t.Parallel() cakey, err := rsa.GenerateKey(rand.Reader, 2048) if err != nil { t.Fatal(err) @@ -173,7 +174,11 @@ func TestTLS(t *testing.T) { }, }, } { - t.Run(tc.name, func(t *testing.T) { doListenerTLSTest(t, tc.Succeed, tc.Server, tc.Client) }) + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + doListenerTLSTest(t, tc.Succeed, tc.Server, tc.Client) + }) } } @@ -216,6 +221,7 @@ func doListenerTLSTest(t *testing.T, expectSuccess bool, serverConfig, clientCon } func TestSetServerName(t *testing.T) { + t.Parallel() if validServerNameTLS("kafka-server.domain.com:9093", nil).ServerName != "kafka-server.domain.com" { t.Fatal("Expected kafka-server.domain.com as tls.ServerName when tls config is nil") } diff --git a/config_test.go b/config_test.go index fdde01ff3..391b884be 100644 --- a/config_test.go +++ b/config_test.go @@ -8,6 +8,7 @@ import ( ) func TestDefaultConfigValidates(t *testing.T) { + t.Parallel() config := NewTestConfig() if err := config.Validate(); err != nil { t.Error(err) @@ -18,6 +19,7 @@ func TestDefaultConfigValidates(t *testing.T) { } func TestInvalidClientIDConfigValidates(t *testing.T) { + t.Parallel() config := NewTestConfig() config.ClientID = "foo:bar" if err := config.Validate(); string(err.(ConfigurationError)) != "ClientID is invalid" { @@ -26,6 +28,7 @@ func TestInvalidClientIDConfigValidates(t *testing.T) { } func TestEmptyClientIDConfigValidates(t *testing.T) { + t.Parallel() config := NewTestConfig() config.ClientID = "" if err := config.Validate(); string(err.(ConfigurationError)) != "ClientID is invalid" { @@ -40,6 +43,7 @@ func (t *DummyTokenProvider) Token() (*AccessToken, error) { } func TestNetConfigValidates(t *testing.T) { + t.Parallel() tests := []struct { name string cfg func(*Config) // resorting to using a function as a param because of internal composite structs @@ -235,6 +239,7 @@ func TestNetConfigValidates(t *testing.T) { } func TestMetadataConfigValidates(t *testing.T) { + t.Parallel() tests := []struct { name string cfg func(*Config) // resorting to using a function as a param because of internal composite structs @@ -273,6 +278,7 @@ func TestMetadataConfigValidates(t *testing.T) { } func TestAdminConfigValidates(t *testing.T) { + t.Parallel() tests := []struct { name string cfg func(*Config) // resorting to using a function as a param because of internal composite structs @@ -297,6 +303,7 @@ func TestAdminConfigValidates(t *testing.T) { } func TestProducerConfigValidates(t *testing.T) { + t.Parallel() tests := []struct { name string cfg func(*Config) // resorting to using a function as a param because of internal composite structs @@ -426,6 +433,7 @@ func TestProducerConfigValidates(t *testing.T) { } func TestConsumerConfigValidates(t *testing.T) { + t.Parallel() tests := []struct { name string cfg func(*Config) @@ -459,6 +467,7 @@ func TestConsumerConfigValidates(t *testing.T) { } func TestLZ4ConfigValidation(t *testing.T) { + t.Parallel() config := NewTestConfig() config.Producer.Compression = CompressionLZ4 if err := config.Validate(); string(err.(ConfigurationError)) != "lz4 compression requires Version >= V0_10_0_0" { @@ -471,6 +480,7 @@ func TestLZ4ConfigValidation(t *testing.T) { } func TestZstdConfigValidation(t *testing.T) { + t.Parallel() config := NewTestConfig() config.Producer.Compression = CompressionZSTD if err := config.Validate(); string(err.(ConfigurationError)) != "zstd compression requires Version >= V2_1_0_0" { diff --git a/consumer_group_members_test.go b/consumer_group_members_test.go index a99de61c6..a7ce8c350 100644 --- a/consumer_group_members_test.go +++ b/consumer_group_members_test.go @@ -45,6 +45,7 @@ var ( ) func TestConsumerGroupMemberMetadata(t *testing.T) { + t.Parallel() meta := &ConsumerGroupMemberMetadata{ Version: 0, Topics: []string{"one", "two"}, @@ -68,6 +69,7 @@ func TestConsumerGroupMemberMetadata(t *testing.T) { } func TestConsumerGroupMemberMetadataV1Decode(t *testing.T) { + t.Parallel() meta := new(ConsumerGroupMemberMetadata) if err := decode(groupMemberMetadataV1, meta); err != nil { t.Error("Failed to decode V1 data", err) @@ -78,6 +80,7 @@ func TestConsumerGroupMemberMetadataV1Decode(t *testing.T) { } func TestConsumerGroupMemberAssignment(t *testing.T) { + t.Parallel() amt := &ConsumerGroupMemberAssignment{ Version: 0, Topics: map[string][]int32{ diff --git a/consumer_metadata_request_test.go b/consumer_metadata_request_test.go index 3f6bcdb57..100c9354a 100644 --- a/consumer_metadata_request_test.go +++ b/consumer_metadata_request_test.go @@ -15,6 +15,7 @@ var ( ) func TestConsumerMetadataRequest(t *testing.T) { + t.Parallel() request := new(ConsumerMetadataRequest) testEncodable(t, "empty string", request, consumerMetadataRequestEmpty) testVersionDecodable(t, "empty string", request, consumerMetadataRequestEmpty, 0) diff --git a/consumer_metadata_response_test.go b/consumer_metadata_response_test.go index a7d74b55b..6b0f782d2 100644 --- a/consumer_metadata_response_test.go +++ b/consumer_metadata_response_test.go @@ -19,6 +19,7 @@ var ( ) func TestConsumerMetadataResponseError(t *testing.T) { + t.Parallel() response := &ConsumerMetadataResponse{Err: ErrOffsetsLoadInProgress} testEncodable(t, "", response, consumerMetadataResponseError) @@ -33,6 +34,7 @@ func TestConsumerMetadataResponseError(t *testing.T) { } func TestConsumerMetadataResponseSuccess(t *testing.T) { + t.Parallel() broker := NewBroker("foo:52445") broker.id = 0xAB response := ConsumerMetadataResponse{ diff --git a/consumer_test.go b/consumer_test.go index 84aecb678..038bbd542 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -16,6 +16,7 @@ var testMsg = StringEncoder("Foo") // If a particular offset is provided then messages are consumed starting from // that offset. func TestConsumerOffsetManual(t *testing.T) { + t.Parallel() // Given broker0 := NewMockBroker(t, 0) @@ -78,6 +79,7 @@ func TestConsumerOffsetManual(t *testing.T) { } func TestPauseResumeConsumption(t *testing.T) { + t.Parallel() // Given broker0 := NewMockBroker(t, 0) @@ -162,6 +164,7 @@ func TestPauseResumeConsumption(t *testing.T) { // message indeed corresponds to the offset that broker claims to be the // newest in its metadata response. func TestConsumerOffsetNewest(t *testing.T) { + t.Parallel() // Given offsetNewest := int64(10) offsetNewestAfterFetchRequest := int64(50) @@ -208,6 +211,7 @@ func TestConsumerOffsetNewest(t *testing.T) { // If `OffsetOldest` is passed as the initial offset then the first consumed // message is indeed the first available in the partition. func TestConsumerOffsetOldest(t *testing.T) { + t.Parallel() // Given offsetNewest := int64(10) broker0 := NewMockBroker(t, 0) @@ -255,6 +259,7 @@ func TestConsumerOffsetOldest(t *testing.T) { // It is possible to close a partition consumer and create the same anew. func TestConsumerRecreate(t *testing.T) { + t.Parallel() // Given broker0 := NewMockBroker(t, 0) broker0.SetHandlerByMap(map[string]MockResponse{ @@ -296,6 +301,7 @@ func TestConsumerRecreate(t *testing.T) { // An attempt to consume the same partition twice should fail. func TestConsumerDuplicate(t *testing.T) { + t.Parallel() // Given broker0 := NewMockBroker(t, 0) broker0.SetHandlerByMap(map[string]MockResponse{ @@ -407,6 +413,7 @@ func runConsumerLeaderRefreshErrorTestWithConfig(t *testing.T, config *Config) { // If consumer fails to refresh metadata it keeps retrying with frequency // specified by `Config.Consumer.Retry.Backoff`. func TestConsumerLeaderRefreshError(t *testing.T) { + t.Parallel() config := NewTestConfig() config.Net.ReadTimeout = 100 * time.Millisecond config.Consumer.Retry.Backoff = 200 * time.Millisecond @@ -417,6 +424,7 @@ func TestConsumerLeaderRefreshError(t *testing.T) { } func TestConsumerLeaderRefreshErrorWithBackoffFunc(t *testing.T) { + t.Parallel() var calls int32 = 0 config := NewTestConfig() @@ -437,6 +445,7 @@ func TestConsumerLeaderRefreshErrorWithBackoffFunc(t *testing.T) { } func TestConsumerInvalidTopic(t *testing.T) { + t.Parallel() // Given broker0 := NewMockBroker(t, 100) broker0.SetHandlerByMap(map[string]MockResponse{ @@ -464,6 +473,7 @@ func TestConsumerInvalidTopic(t *testing.T) { // Nothing bad happens if a partition consumer that has no leader assigned at // the moment is closed. func TestConsumerClosePartitionWithoutLeader(t *testing.T) { + t.Parallel() // Given broker0 := NewMockBroker(t, 100) broker0.SetHandlerByMap(map[string]MockResponse{ @@ -518,6 +528,7 @@ func TestConsumerClosePartitionWithoutLeader(t *testing.T) { // actual offset range for the partition, then the partition consumer stops // immediately closing its output channels. func TestConsumerShutsDownOutOfRange(t *testing.T) { + t.Parallel() // Given broker0 := NewMockBroker(t, 0) fetchResponse := new(FetchResponse) @@ -556,6 +567,7 @@ func TestConsumerShutsDownOutOfRange(t *testing.T) { // If a fetch response contains messages with offsets that are smaller then // requested, then such messages are ignored. func TestConsumerExtraOffsets(t *testing.T) { + t.Parallel() // Given legacyFetchResponse := &FetchResponse{} legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 1) @@ -630,6 +642,7 @@ func TestConsumerExtraOffsets(t *testing.T) { // messages older then requested, even though there would be // more messages if higher offset was requested. func TestConsumerReceivingFetchResponseWithTooOldRecords(t *testing.T) { + t.Parallel() // Given fetchResponse1 := &FetchResponse{Version: 4} fetchResponse1.AddRecord("my_topic", 0, nil, testMsg, 1) @@ -678,6 +691,7 @@ func TestConsumerReceivingFetchResponseWithTooOldRecords(t *testing.T) { } func TestConsumeMessageWithNewerFetchAPIVersion(t *testing.T) { + t.Parallel() // Given fetchResponse1 := &FetchResponse{Version: 4} fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1) @@ -721,6 +735,7 @@ func TestConsumeMessageWithNewerFetchAPIVersion(t *testing.T) { } func TestConsumeMessageWithSessionIDs(t *testing.T) { + t.Parallel() // Given fetchResponse1 := &FetchResponse{Version: 7} fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1) @@ -770,6 +785,7 @@ func TestConsumeMessageWithSessionIDs(t *testing.T) { } func TestConsumeMessagesFromReadReplica(t *testing.T) { + t.Parallel() // Given fetchResponse1 := &FetchResponse{Version: 11} fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1) @@ -848,6 +864,7 @@ func TestConsumeMessagesFromReadReplica(t *testing.T) { } func TestConsumeMessagesFromReadReplicaLeaderFallback(t *testing.T) { + t.Parallel() // Given fetchResponse1 := &FetchResponse{Version: 11} fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1) @@ -900,6 +917,7 @@ func TestConsumeMessagesFromReadReplicaLeaderFallback(t *testing.T) { } func TestConsumeMessagesFromReadReplicaErrorReplicaNotAvailable(t *testing.T) { + t.Parallel() // Given fetchResponse1 := &FetchResponse{Version: 11} block1 := fetchResponse1.getOrCreateBlock("my_topic", 0) @@ -972,6 +990,7 @@ func TestConsumeMessagesFromReadReplicaErrorReplicaNotAvailable(t *testing.T) { } func TestConsumeMessagesFromReadReplicaErrorUnknown(t *testing.T) { + t.Parallel() // Given fetchResponse1 := &FetchResponse{Version: 11} block1 := fetchResponse1.getOrCreateBlock("my_topic", 0) @@ -1050,6 +1069,7 @@ func TestConsumeMessagesFromReadReplicaErrorUnknown(t *testing.T) { // // See https://github.com/Shopify/sarama/issues/1927 func TestConsumeMessagesTrackLeader(t *testing.T) { + t.Parallel() cfg := NewConfig() cfg.ClientID = t.Name() cfg.Metadata.RefreshFrequency = time.Millisecond * 50 @@ -1161,6 +1181,7 @@ func TestConsumeMessagesTrackLeader(t *testing.T) { // It is fine if offsets of fetched messages are not sequential (although // strictly increasing!). func TestConsumerNonSequentialOffsets(t *testing.T) { + t.Parallel() // Given legacyFetchResponse := &FetchResponse{} legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 5) @@ -1220,6 +1241,7 @@ func TestConsumerNonSequentialOffsets(t *testing.T) { // If leadership for a partition is changing then consumer resolves the new // leader and switches to it. func TestConsumerRebalancingMultiplePartitions(t *testing.T) { + t.Parallel() // initial setup seedBroker := NewMockBroker(t, 10) leader0 := NewMockBroker(t, 0) @@ -1418,6 +1440,7 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) { // consumer channel buffer is full then that does not affect the ability to // read messages by the other consumer. func TestConsumerInterleavedClose(t *testing.T) { + t.Parallel() // Given broker0 := NewMockBroker(t, 0) broker0.SetHandlerByMap(map[string]MockResponse{ @@ -1466,6 +1489,7 @@ func TestConsumerInterleavedClose(t *testing.T) { } func TestConsumerBounceWithReferenceOpen(t *testing.T) { + t.Parallel() broker0 := NewMockBroker(t, 0) broker0Addr := broker0.Addr() broker1 := NewMockBroker(t, 1) @@ -1564,6 +1588,7 @@ func TestConsumerBounceWithReferenceOpen(t *testing.T) { } func TestConsumerOffsetOutOfRange(t *testing.T) { + t.Parallel() // Given broker0 := NewMockBroker(t, 2) broker0.SetHandlerByMap(map[string]MockResponse{ @@ -1596,6 +1621,7 @@ func TestConsumerOffsetOutOfRange(t *testing.T) { } func TestConsumerExpiryTicker(t *testing.T) { + t.Parallel() // Given broker0 := NewMockBroker(t, 0) fetchResponse1 := &FetchResponse{} @@ -1638,6 +1664,7 @@ func TestConsumerExpiryTicker(t *testing.T) { } func TestConsumerTimestamps(t *testing.T) { + t.Parallel() now := time.Now().Truncate(time.Millisecond) type testMessage struct { key Encoder @@ -1755,6 +1782,7 @@ func TestConsumerTimestamps(t *testing.T) { // When set to ReadCommitted, no uncommitted message should be available in messages channel func TestExcludeUncommitted(t *testing.T) { + t.Parallel() // Given broker0 := NewMockBroker(t, 0) @@ -1868,6 +1896,7 @@ ConsumerLoop: } func Test_partitionConsumer_parseResponse(t *testing.T) { + t.Parallel() type args struct { response *FetchResponse } @@ -1894,7 +1923,9 @@ func Test_partitionConsumer_parseResponse(t *testing.T) { }, } for _, tt := range tests { + tt := tt t.Run(tt.name, func(t *testing.T) { + t.Parallel() child := &partitionConsumer{ broker: &brokerConsumer{ broker: &Broker{}, @@ -1914,6 +1945,7 @@ func Test_partitionConsumer_parseResponse(t *testing.T) { } func Test_partitionConsumer_parseResponseEmptyBatch(t *testing.T) { + t.Parallel() lrbOffset := int64(5) block := &FetchResponseBlock{ HighWaterMarkOffset: 10, @@ -1996,6 +2028,7 @@ func testConsumerInterceptor( } func TestConsumerInterceptors(t *testing.T) { + t.Parallel() tests := []struct { name string interceptors []ConsumerInterceptor @@ -2051,6 +2084,10 @@ func TestConsumerInterceptors(t *testing.T) { }, } for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { testConsumerInterceptor(t, tt.interceptors, tt.expectationFn) }) + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + testConsumerInterceptor(t, tt.interceptors, tt.expectationFn) + }) } } diff --git a/control_record_test.go b/control_record_test.go index 8a1d74863..871edbab6 100644 --- a/control_record_test.go +++ b/control_record_test.go @@ -46,6 +46,7 @@ func assertRecordType(t *testing.T, r *ControlRecord, expected ControlRecordType } func TestDecodingControlRecords(t *testing.T) { + t.Parallel() abortTx := testDecode(t, "abort transaction", abortTxCtrlRecKey, abortTxCtrlRecValue) assertRecordType(t, &abortTx, ControlRecordAbort) diff --git a/create_partitions_request_test.go b/create_partitions_request_test.go index 8230b3866..0c5923cda 100644 --- a/create_partitions_request_test.go +++ b/create_partitions_request_test.go @@ -30,6 +30,7 @@ var ( ) func TestCreatePartitionsRequest(t *testing.T) { + t.Parallel() req := &CreatePartitionsRequest{ TopicPartitions: map[string]*TopicPartition{ "topic": { diff --git a/create_partitions_response_test.go b/create_partitions_response_test.go index 2f5967494..9f9ee64ee 100644 --- a/create_partitions_response_test.go +++ b/create_partitions_response_test.go @@ -25,6 +25,7 @@ var ( ) func TestCreatePartitionsResponse(t *testing.T) { + t.Parallel() resp := &CreatePartitionsResponse{ ThrottleTime: 100 * time.Millisecond, TopicPartitionErrors: map[string]*TopicPartitionError{ @@ -52,6 +53,7 @@ func TestCreatePartitionsResponse(t *testing.T) { } func TestTopicPartitionError(t *testing.T) { + t.Parallel() // Assert that TopicPartitionError satisfies error interface var err error = &TopicPartitionError{ Err: ErrTopicAuthorizationFailed, diff --git a/create_topics_request_test.go b/create_topics_request_test.go index daf223c53..3298e06a2 100644 --- a/create_topics_request_test.go +++ b/create_topics_request_test.go @@ -23,6 +23,7 @@ var ( ) func TestCreateTopicsRequest(t *testing.T) { + t.Parallel() retention := "-1" req := &CreateTopicsRequest{ diff --git a/create_topics_response_test.go b/create_topics_response_test.go index 2dfb8d159..73d803784 100644 --- a/create_topics_response_test.go +++ b/create_topics_response_test.go @@ -29,6 +29,7 @@ var ( ) func TestCreateTopicsResponse(t *testing.T) { + t.Parallel() resp := &CreateTopicsResponse{ TopicErrors: map[string]*TopicError{ "topic": { @@ -52,6 +53,7 @@ func TestCreateTopicsResponse(t *testing.T) { } func TestTopicError(t *testing.T) { + t.Parallel() // Assert that TopicError satisfies error interface var err error = &TopicError{ Err: ErrTopicAuthorizationFailed, diff --git a/delete_groups_request_test.go b/delete_groups_request_test.go index 908172498..cf1144493 100644 --- a/delete_groups_request_test.go +++ b/delete_groups_request_test.go @@ -18,6 +18,7 @@ var ( ) func TestDeleteGroupsRequest(t *testing.T) { + t.Parallel() var request *DeleteGroupsRequest request = new(DeleteGroupsRequest) diff --git a/delete_groups_response_test.go b/delete_groups_response_test.go index 6f622b5f0..e524d288f 100644 --- a/delete_groups_response_test.go +++ b/delete_groups_response_test.go @@ -26,6 +26,7 @@ var ( ) func TestDeleteGroupsResponse(t *testing.T) { + t.Parallel() var response *DeleteGroupsResponse response = new(DeleteGroupsResponse) diff --git a/delete_offsets_request_test.go b/delete_offsets_request_test.go index 0eea3fc9f..3e9769c8f 100644 --- a/delete_offsets_request_test.go +++ b/delete_offsets_request_test.go @@ -10,6 +10,7 @@ var ( ) func TestDeleteOffsetsRequest(t *testing.T) { + t.Parallel() var request *DeleteOffsetsRequest request = new(DeleteOffsetsRequest) diff --git a/delete_offsets_response_test.go b/delete_offsets_response_test.go index 8d069dbcb..aec939bbb 100644 --- a/delete_offsets_response_test.go +++ b/delete_offsets_response_test.go @@ -33,6 +33,7 @@ var ( ) func TestDeleteOffsetsResponse(t *testing.T) { + t.Parallel() var response *DeleteOffsetsResponse response = &DeleteOffsetsResponse{ diff --git a/delete_records_request_test.go b/delete_records_request_test.go index c72960cfb..f42d2ffe7 100644 --- a/delete_records_request_test.go +++ b/delete_records_request_test.go @@ -19,6 +19,7 @@ var deleteRecordsRequest = []byte{ } func TestDeleteRecordsRequest(t *testing.T) { + t.Parallel() req := &DeleteRecordsRequest{ Topics: map[string]*DeleteRecordsRequestTopic{ "topic": { diff --git a/delete_records_response_test.go b/delete_records_response_test.go index 3653cdc41..32e535b20 100644 --- a/delete_records_response_test.go +++ b/delete_records_response_test.go @@ -21,6 +21,7 @@ var deleteRecordsResponse = []byte{ } func TestDeleteRecordsResponse(t *testing.T) { + t.Parallel() resp := &DeleteRecordsResponse{ Version: 0, ThrottleTime: 100 * time.Millisecond, diff --git a/delete_topics_request_test.go b/delete_topics_request_test.go index c313a2f3b..6f4ea1ba3 100644 --- a/delete_topics_request_test.go +++ b/delete_topics_request_test.go @@ -13,6 +13,7 @@ var deleteTopicsRequest = []byte{ } func TestDeleteTopicsRequestV0(t *testing.T) { + t.Parallel() req := &DeleteTopicsRequest{ Version: 0, Topics: []string{"topic", "other"}, @@ -23,6 +24,7 @@ func TestDeleteTopicsRequestV0(t *testing.T) { } func TestDeleteTopicsRequestV1(t *testing.T) { + t.Parallel() req := &DeleteTopicsRequest{ Version: 1, Topics: []string{"topic", "other"}, diff --git a/delete_topics_response_test.go b/delete_topics_response_test.go index 516f1a3bd..534acd4b8 100644 --- a/delete_topics_response_test.go +++ b/delete_topics_response_test.go @@ -21,6 +21,7 @@ var ( ) func TestDeleteTopicsResponse(t *testing.T) { + t.Parallel() resp := &DeleteTopicsResponse{ TopicErrorCodes: map[string]KError{ "topic": ErrNoError, diff --git a/describe_client_quotas_request_test.go b/describe_client_quotas_request_test.go index 6b8c0c9c5..790443e4a 100644 --- a/describe_client_quotas_request_test.go +++ b/describe_client_quotas_request_test.go @@ -37,6 +37,7 @@ var ( ) func TestDescribeClientQuotasRequest(t *testing.T) { + t.Parallel() // Match All req := &DescribeClientQuotasRequest{ Components: []QuotaFilterComponent{}, diff --git a/describe_client_quotas_response_test.go b/describe_client_quotas_response_test.go index 6c681d11c..1248838ec 100644 --- a/describe_client_quotas_response_test.go +++ b/describe_client_quotas_response_test.go @@ -44,6 +44,7 @@ var ( ) func TestDescribeClientQuotasResponse(t *testing.T) { + t.Parallel() // Response With Error errMsg := "Custom entity type 'faulty' not supported" res := &DescribeClientQuotasResponse{ diff --git a/describe_configs_request_test.go b/describe_configs_request_test.go index a1148f401..79160ad2f 100644 --- a/describe_configs_request_test.go +++ b/describe_configs_request_test.go @@ -49,6 +49,7 @@ var ( ) func TestDescribeConfigsRequestv0(t *testing.T) { + t.Parallel() var request *DescribeConfigsRequest request = &DescribeConfigsRequest{ @@ -102,6 +103,7 @@ func TestDescribeConfigsRequestv0(t *testing.T) { } func TestDescribeConfigsRequestv1(t *testing.T) { + t.Parallel() request := &DescribeConfigsRequest{ Version: 1, Resources: []*ConfigResource{ diff --git a/describe_configs_response_test.go b/describe_configs_response_test.go index ea8f28e57..0aa602d6a 100644 --- a/describe_configs_response_test.go +++ b/describe_configs_response_test.go @@ -93,6 +93,7 @@ var ( ) func TestDescribeConfigsResponsev0(t *testing.T) { + t.Parallel() var response *DescribeConfigsResponse response = &DescribeConfigsResponse{ @@ -127,6 +128,7 @@ func TestDescribeConfigsResponsev0(t *testing.T) { } func TestDescribeConfigsResponseWithDefaultv0(t *testing.T) { + t.Parallel() var response *DescribeConfigsResponse response = &DescribeConfigsResponse{ @@ -161,6 +163,7 @@ func TestDescribeConfigsResponseWithDefaultv0(t *testing.T) { } func TestDescribeConfigsResponsev1(t *testing.T) { + t.Parallel() var response *DescribeConfigsResponse response = &DescribeConfigsResponse{ @@ -197,6 +200,7 @@ func TestDescribeConfigsResponsev1(t *testing.T) { } func TestDescribeConfigsResponseWithSynonym(t *testing.T) { + t.Parallel() var response *DescribeConfigsResponse response = &DescribeConfigsResponse{ @@ -239,6 +243,7 @@ func TestDescribeConfigsResponseWithSynonym(t *testing.T) { } func TestDescribeConfigsResponseWithDefaultv1(t *testing.T) { + t.Parallel() var response *DescribeConfigsResponse response = &DescribeConfigsResponse{ diff --git a/describe_groups_request_test.go b/describe_groups_request_test.go index 7d45f3fee..3a538263d 100644 --- a/describe_groups_request_test.go +++ b/describe_groups_request_test.go @@ -18,6 +18,7 @@ var ( ) func TestDescribeGroupsRequest(t *testing.T) { + t.Parallel() var request *DescribeGroupsRequest request = new(DescribeGroupsRequest) diff --git a/describe_groups_response_test.go b/describe_groups_response_test.go index dd3973191..b63b25ec3 100644 --- a/describe_groups_response_test.go +++ b/describe_groups_response_test.go @@ -35,6 +35,7 @@ var ( ) func TestDescribeGroupsResponse(t *testing.T) { + t.Parallel() var response *DescribeGroupsResponse response = new(DescribeGroupsResponse) diff --git a/describe_log_dirs_request_test.go b/describe_log_dirs_request_test.go index 65b88c685..f2b140ad1 100644 --- a/describe_log_dirs_request_test.go +++ b/describe_log_dirs_request_test.go @@ -15,6 +15,7 @@ var ( ) func TestDescribeLogDirsRequest(t *testing.T) { + t.Parallel() request := &DescribeLogDirsRequest{ Version: 0, DescribeTopics: []DescribeLogDirsRequestTopic{}, diff --git a/describe_log_dirs_response_test.go b/describe_log_dirs_response_test.go index 5a1eb5b5e..f533e8e0c 100644 --- a/describe_log_dirs_response_test.go +++ b/describe_log_dirs_response_test.go @@ -32,6 +32,7 @@ var ( ) func TestDescribeLogDirsResponse(t *testing.T) { + t.Parallel() // Test empty response response := &DescribeLogDirsResponse{ LogDirs: []DescribeLogDirsResponseDirMetadata{}, diff --git a/describe_user_scram_credentials_request_test.go b/describe_user_scram_credentials_request_test.go index 87e52bab6..a522a075b 100644 --- a/describe_user_scram_credentials_request_test.go +++ b/describe_user_scram_credentials_request_test.go @@ -15,6 +15,7 @@ var ( ) func TestDescribeUserScramCredentialsRequest(t *testing.T) { + t.Parallel() request := &DescribeUserScramCredentialsRequest{ Version: 0, DescribeUsers: []DescribeUserScramCredentialsRequestUser{}, diff --git a/describe_user_scram_credentials_response_test.go b/describe_user_scram_credentials_response_test.go index a251eaf7a..66b7a2884 100644 --- a/describe_user_scram_credentials_response_test.go +++ b/describe_user_scram_credentials_response_test.go @@ -30,6 +30,7 @@ var ( ) func TestDescribeUserScramCredentialsResponse(t *testing.T) { + t.Parallel() response := &DescribeUserScramCredentialsResponse{ Version: 0, ThrottleTime: time.Second * 3, diff --git a/end_txn_request_test.go b/end_txn_request_test.go index 6f5d4480e..cc8fc7ee2 100644 --- a/end_txn_request_test.go +++ b/end_txn_request_test.go @@ -10,6 +10,7 @@ var endTxnRequest = []byte{ } func TestEndTxnRequest(t *testing.T) { + t.Parallel() req := &EndTxnRequest{ TransactionalID: "txn", ProducerID: 8000, diff --git a/end_txn_response_test.go b/end_txn_response_test.go index d7ae1c988..291cfb653 100644 --- a/end_txn_response_test.go +++ b/end_txn_response_test.go @@ -11,6 +11,7 @@ var endTxnResponse = []byte{ } func TestEndTxnResponse(t *testing.T) { + t.Parallel() resp := &EndTxnResponse{ ThrottleTime: 100 * time.Millisecond, Err: ErrInvalidProducerIDMapping, diff --git a/examples/http_server/http_server_test.go b/examples/http_server/http_server_test.go index ac3ba4d07..f171c85b2 100644 --- a/examples/http_server/http_server_test.go +++ b/examples/http_server/http_server_test.go @@ -14,6 +14,7 @@ import ( // and one data collector entry. Let's assume both will succeed. // We should return a HTTP 200 status. func TestCollectSuccessfully(t *testing.T) { + t.Parallel() dataCollectorMock := mocks.NewSyncProducer(t, nil) dataCollectorMock.ExpectSendMessageAndSucceed() @@ -50,6 +51,7 @@ func TestCollectSuccessfully(t *testing.T) { // Now, let's see if we handle the case of not being able to produce // to the data collector properly. In this case we should return a 500 status. func TestCollectionFailure(t *testing.T) { + t.Parallel() dataCollectorMock := mocks.NewSyncProducer(t, nil) dataCollectorMock.ExpectSendMessageAndFail(sarama.ErrRequestTimedOut) @@ -78,6 +80,7 @@ func TestCollectionFailure(t *testing.T) { // so we are not setting any expectations on the dataCollectorMock. It // will still generate an access log entry though. func TestWrongPath(t *testing.T) { + t.Parallel() dataCollectorMock := mocks.NewSyncProducer(t, nil) accessLogProducerMock := mocks.NewAsyncProducer(t, nil) diff --git a/fetch_request_test.go b/fetch_request_test.go index 0b807b9f8..9fd59b40c 100644 --- a/fetch_request_test.go +++ b/fetch_request_test.go @@ -51,12 +51,15 @@ var ( ) func TestFetchRequest(t *testing.T) { + t.Parallel() t.Run("no blocks", func(t *testing.T) { + t.Parallel() request := new(FetchRequest) testRequest(t, "no blocks", request, fetchRequestNoBlocks) }) t.Run("with properties", func(t *testing.T) { + t.Parallel() request := new(FetchRequest) request.MaxWaitTime = 0x20 request.MinBytes = 0xEF @@ -64,6 +67,7 @@ func TestFetchRequest(t *testing.T) { }) t.Run("one block", func(t *testing.T) { + t.Parallel() request := new(FetchRequest) request.MaxWaitTime = 0 request.MinBytes = 0 @@ -72,6 +76,7 @@ func TestFetchRequest(t *testing.T) { }) t.Run("one block v4", func(t *testing.T) { + t.Parallel() request := new(FetchRequest) request.Version = 4 request.MaxBytes = 0xFF @@ -81,6 +86,7 @@ func TestFetchRequest(t *testing.T) { }) t.Run("one block v11 rackid", func(t *testing.T) { + t.Parallel() request := new(FetchRequest) request.Version = 11 request.MaxBytes = 0xFF diff --git a/fetch_response_test.go b/fetch_response_test.go index 3ba3eb5d1..badbac821 100644 --- a/fetch_response_test.go +++ b/fetch_response_test.go @@ -202,6 +202,7 @@ var ( ) func TestEmptyFetchResponse(t *testing.T) { + t.Parallel() response := FetchResponse{} testVersionDecodable(t, "empty", &response, emptyFetchResponse, 0) @@ -211,6 +212,7 @@ func TestEmptyFetchResponse(t *testing.T) { } func TestOneMessageFetchResponse(t *testing.T) { + t.Parallel() response := FetchResponse{} testVersionDecodable(t, "one message", &response, oneMessageFetchResponse, 0) @@ -267,6 +269,7 @@ func TestOneMessageFetchResponse(t *testing.T) { } func TestOverflowMessageFetchResponse(t *testing.T) { + t.Parallel() response := FetchResponse{} testVersionDecodable(t, "overflow message", &response, overflowMessageFetchResponse, 0) @@ -327,6 +330,7 @@ func TestOverflowMessageFetchResponse(t *testing.T) { } func TestOneRecordFetchResponse(t *testing.T) { + t.Parallel() response := FetchResponse{} testVersionDecodable(t, "one record", &response, oneRecordFetchResponse, 4) @@ -376,6 +380,7 @@ func TestOneRecordFetchResponse(t *testing.T) { } func TestPartailFetchResponse(t *testing.T) { + t.Parallel() response := FetchResponse{} testVersionDecodable(t, "partial record", &response, partialFetchResponse, 4) @@ -418,6 +423,7 @@ func TestPartailFetchResponse(t *testing.T) { } func TestEmptyRecordsFetchResponse(t *testing.T) { + t.Parallel() response := FetchResponse{} testVersionDecodable(t, "empty record", &response, emptyRecordsFetchResponsev11, 11) @@ -463,6 +469,7 @@ func TestEmptyRecordsFetchResponse(t *testing.T) { } func TestOneMessageFetchResponseV4(t *testing.T) { + t.Parallel() response := FetchResponse{} testVersionDecodable(t, "one message v4", &response, oneMessageFetchResponseV4, 4) @@ -519,6 +526,7 @@ func TestOneMessageFetchResponseV4(t *testing.T) { } func TestPreferredReplicaFetchResponseV11(t *testing.T) { + t.Parallel() response := FetchResponse{} testVersionDecodable( t, "preferred replica fetch response v11", &response, diff --git a/find_coordinator_request_test.go b/find_coordinator_request_test.go index 7e889b074..c5c56ceb5 100644 --- a/find_coordinator_request_test.go +++ b/find_coordinator_request_test.go @@ -15,6 +15,7 @@ var ( ) func TestFindCoordinatorRequest(t *testing.T) { + t.Parallel() req := &FindCoordinatorRequest{ Version: 1, CoordinatorKey: "group", diff --git a/find_coordinator_response_test.go b/find_coordinator_response_test.go index 417a76c6e..f790e01d0 100644 --- a/find_coordinator_response_test.go +++ b/find_coordinator_response_test.go @@ -6,6 +6,7 @@ import ( ) func TestFindCoordinatorResponse(t *testing.T) { + t.Parallel() errMsg := "kaboom" for _, tc := range []struct { diff --git a/heartbeat_request_test.go b/heartbeat_request_test.go index 2653f82c7..c2588386c 100644 --- a/heartbeat_request_test.go +++ b/heartbeat_request_test.go @@ -9,6 +9,7 @@ var basicHeartbeatRequest = []byte{ } func TestHeartbeatRequest(t *testing.T) { + t.Parallel() request := new(HeartbeatRequest) request.GroupId = "foo" request.GenerationId = 66051 diff --git a/heartbeat_response_test.go b/heartbeat_response_test.go index e60146bdc..6ca6eab61 100644 --- a/heartbeat_response_test.go +++ b/heartbeat_response_test.go @@ -7,6 +7,7 @@ var heartbeatResponseNoError = []byte{ } func TestHeartbeatResponse(t *testing.T) { + t.Parallel() response := new(HeartbeatResponse) testVersionDecodable(t, "no error", response, heartbeatResponseNoError, 0) if response.Err != ErrNoError { diff --git a/incremental_alter_configs_request_test.go b/incremental_alter_configs_request_test.go index d239bf5f5..977869011 100644 --- a/incremental_alter_configs_request_test.go +++ b/incremental_alter_configs_request_test.go @@ -44,6 +44,7 @@ var ( ) func TestIncrementalAlterConfigsRequest(t *testing.T) { + t.Parallel() var request *IncrementalAlterConfigsRequest request = &IncrementalAlterConfigsRequest{ diff --git a/incremental_alter_configs_response_test.go b/incremental_alter_configs_response_test.go index 697156709..0e9f92adc 100644 --- a/incremental_alter_configs_response_test.go +++ b/incremental_alter_configs_response_test.go @@ -21,6 +21,7 @@ var ( ) func TestIncrementalAlterConfigsResponse(t *testing.T) { + t.Parallel() var response *IncrementalAlterConfigsResponse response = &IncrementalAlterConfigsResponse{ diff --git a/init_producer_id_request_test.go b/init_producer_id_request_test.go index 5c83d8514..78261884c 100644 --- a/init_producer_id_request_test.go +++ b/init_producer_id_request_test.go @@ -18,6 +18,7 @@ var ( ) func TestInitProducerIDRequest(t *testing.T) { + t.Parallel() req := &InitProducerIDRequest{ TransactionTimeout: 100 * time.Millisecond, } diff --git a/init_producer_id_response_test.go b/init_producer_id_response_test.go index b0649386a..15f07f330 100644 --- a/init_producer_id_response_test.go +++ b/init_producer_id_response_test.go @@ -22,6 +22,7 @@ var ( ) func TestInitProducerIDResponse(t *testing.T) { + t.Parallel() resp := &InitProducerIDResponse{ ThrottleTime: 100 * time.Millisecond, ProducerID: 8000, diff --git a/join_group_request_test.go b/join_group_request_test.go index a2e17f980..bcdd85575 100644 --- a/join_group_request_test.go +++ b/join_group_request_test.go @@ -34,6 +34,7 @@ var ( ) func TestJoinGroupRequest(t *testing.T) { + t.Parallel() request := new(JoinGroupRequest) request.GroupId = "TestGroup" request.SessionTimeout = 100 @@ -42,6 +43,7 @@ func TestJoinGroupRequest(t *testing.T) { } func TestJoinGroupRequestV0_OneProtocol(t *testing.T) { + t.Parallel() request := new(JoinGroupRequest) request.GroupId = "TestGroup" request.SessionTimeout = 100 @@ -55,6 +57,7 @@ func TestJoinGroupRequestV0_OneProtocol(t *testing.T) { } func TestJoinGroupRequestDeprecatedEncode(t *testing.T) { + t.Parallel() request := new(JoinGroupRequest) request.GroupId = "TestGroup" request.SessionTimeout = 100 @@ -68,6 +71,7 @@ func TestJoinGroupRequestDeprecatedEncode(t *testing.T) { } func TestJoinGroupRequestV1(t *testing.T) { + t.Parallel() request := new(JoinGroupRequest) request.Version = 1 request.GroupId = "TestGroup" diff --git a/join_group_response_test.go b/join_group_response_test.go index a43b37a95..6ec1b643e 100644 --- a/join_group_response_test.go +++ b/join_group_response_test.go @@ -56,6 +56,7 @@ var ( ) func TestJoinGroupResponseV0(t *testing.T) { + t.Parallel() var response *JoinGroupResponse response = new(JoinGroupResponse) @@ -117,6 +118,7 @@ func TestJoinGroupResponseV0(t *testing.T) { } func TestJoinGroupResponseV1(t *testing.T) { + t.Parallel() response := new(JoinGroupResponse) testVersionDecodable(t, "no error", response, joinGroupResponseV1, 1) if response.Err != ErrNoError { @@ -143,6 +145,7 @@ func TestJoinGroupResponseV1(t *testing.T) { } func TestJoinGroupResponseV2(t *testing.T) { + t.Parallel() response := new(JoinGroupResponse) testVersionDecodable(t, "no error", response, joinGroupResponseV2, 2) if response.ThrottleTime != 100 { diff --git a/kerberos_client_test.go b/kerberos_client_test.go index 7c2ed31bc..b1a6d48fd 100644 --- a/kerberos_client_test.go +++ b/kerberos_client_test.go @@ -58,6 +58,7 @@ const ( ) func TestFaildToCreateKerberosConfig(t *testing.T) { + t.Parallel() expectedErr := errors.New("configuration file could not be opened: krb5.conf open krb5.conf: no such file or directory") clientConfig := NewTestConfig() clientConfig.Net.SASL.Mechanism = SASLTypeGSSAPI @@ -76,6 +77,7 @@ func TestFaildToCreateKerberosConfig(t *testing.T) { } func TestCreateWithPassword(t *testing.T) { + t.Parallel() kerberosConfig, err := krbcfg.NewFromString(krb5cfg) if err != nil { t.Fatal(err) @@ -106,6 +108,7 @@ func TestCreateWithPassword(t *testing.T) { } func TestCreateWithKeyTab(t *testing.T) { + t.Parallel() kerberosConfig, err := krbcfg.NewFromString(krb5cfg) if err != nil { t.Fatal(err) @@ -128,6 +131,7 @@ func TestCreateWithKeyTab(t *testing.T) { } func TestCreateWithDisablePAFXFAST(t *testing.T) { + t.Parallel() kerberosConfig, err := krbcfg.NewFromString(krb5cfg) if err != nil { t.Fatal(err) diff --git a/leave_group_request_test.go b/leave_group_request_test.go index b674e48b2..978c44962 100644 --- a/leave_group_request_test.go +++ b/leave_group_request_test.go @@ -8,6 +8,7 @@ var basicLeaveGroupRequest = []byte{ } func TestLeaveGroupRequest(t *testing.T) { + t.Parallel() request := new(LeaveGroupRequest) request.GroupId = "foo" request.MemberId = "bar" diff --git a/leave_group_response_test.go b/leave_group_response_test.go index 9207c6668..4a24334cc 100644 --- a/leave_group_response_test.go +++ b/leave_group_response_test.go @@ -8,6 +8,7 @@ var ( ) func TestLeaveGroupResponse(t *testing.T) { + t.Parallel() var response *LeaveGroupResponse response = new(LeaveGroupResponse) diff --git a/list_groups_request_test.go b/list_groups_request_test.go index 2e977d9a5..f5d06da16 100644 --- a/list_groups_request_test.go +++ b/list_groups_request_test.go @@ -3,5 +3,6 @@ package sarama import "testing" func TestListGroupsRequest(t *testing.T) { + t.Parallel() testRequest(t, "ListGroupsRequest", &ListGroupsRequest{}, []byte{}) } diff --git a/list_groups_response_test.go b/list_groups_response_test.go index 41ab822f9..1c21279d2 100644 --- a/list_groups_response_test.go +++ b/list_groups_response_test.go @@ -24,6 +24,7 @@ var ( ) func TestListGroupsResponse(t *testing.T) { + t.Parallel() var response *ListGroupsResponse response = new(ListGroupsResponse) diff --git a/list_partition_reassignments_request_test.go b/list_partition_reassignments_request_test.go index e50b289b9..ec5215b6e 100644 --- a/list_partition_reassignments_request_test.go +++ b/list_partition_reassignments_request_test.go @@ -12,6 +12,7 @@ var listPartitionReassignmentsRequestOneBlock = []byte{ } func TestListPartitionReassignmentRequest(t *testing.T) { + t.Parallel() var request *ListPartitionReassignmentsRequest = &ListPartitionReassignmentsRequest{ TimeoutMs: int32(10000), Version: int16(0), diff --git a/list_partition_reassignments_response_test.go b/list_partition_reassignments_response_test.go index 71532423d..85338ab40 100644 --- a/list_partition_reassignments_response_test.go +++ b/list_partition_reassignments_response_test.go @@ -17,6 +17,7 @@ var listPartitionReassignmentsResponse = []byte{ } func TestListPartitionReassignmentResponse(t *testing.T) { + t.Parallel() var response *ListPartitionReassignmentsResponse = &ListPartitionReassignmentsResponse{ ThrottleTimeMs: int32(10000), Version: int16(0), diff --git a/message_test.go b/message_test.go index a6c7cff2a..134130298 100644 --- a/message_test.go +++ b/message_test.go @@ -117,6 +117,7 @@ var ( ) func TestMessageEncoding(t *testing.T) { + t.Parallel() message := Message{} testEncodable(t, "empty", &message, emptyMessage) @@ -138,6 +139,7 @@ func TestMessageEncoding(t *testing.T) { } func TestMessageDecoding(t *testing.T) { + t.Parallel() message := Message{} testDecodable(t, "empty", &message, emptyMessage) if message.Codec != CompressionNone { @@ -166,6 +168,7 @@ func TestMessageDecoding(t *testing.T) { } func TestMessageDecodingBulkSnappy(t *testing.T) { + t.Parallel() message := Message{} testDecodable(t, "bulk snappy", &message, emptyBulkSnappyMessage) if message.Codec != CompressionSnappy { @@ -182,6 +185,7 @@ func TestMessageDecodingBulkSnappy(t *testing.T) { } func TestMessageDecodingBulkGzip(t *testing.T) { + t.Parallel() message := Message{} testDecodable(t, "bulk gzip", &message, emptyBulkGzipMessage) if message.Codec != CompressionGZIP { @@ -198,6 +202,7 @@ func TestMessageDecodingBulkGzip(t *testing.T) { } func TestMessageDecodingBulkLZ4(t *testing.T) { + t.Parallel() message := Message{} testDecodable(t, "bulk lz4", &message, emptyBulkLZ4Message) if message.Codec != CompressionLZ4 { @@ -214,6 +219,7 @@ func TestMessageDecodingBulkLZ4(t *testing.T) { } func TestMessageDecodingBulkZSTD(t *testing.T) { + t.Parallel() message := Message{} testDecodable(t, "bulk zstd", &message, emptyBulkZSTDMessage) if message.Codec != CompressionZSTD { @@ -230,11 +236,13 @@ func TestMessageDecodingBulkZSTD(t *testing.T) { } func TestMessageDecodingVersion1(t *testing.T) { + t.Parallel() message := Message{Version: 1} testDecodable(t, "decoding empty v1 message", &message, emptyV1Message) } func TestMessageDecodingUnknownVersions(t *testing.T) { + t.Parallel() message := Message{Version: 2} err := decode(emptyV2Message, &message) if err == nil { diff --git a/metadata_request_test.go b/metadata_request_test.go index 16f6b5942..c71dc7c82 100644 --- a/metadata_request_test.go +++ b/metadata_request_test.go @@ -63,6 +63,7 @@ var ( ) func TestMetadataRequestV0(t *testing.T) { + t.Parallel() request := new(MetadataRequest) testRequest(t, "no topics", request, metadataRequestNoTopicsV0) @@ -74,6 +75,7 @@ func TestMetadataRequestV0(t *testing.T) { } func TestMetadataRequestV1(t *testing.T) { + t.Parallel() request := new(MetadataRequest) request.Version = 1 testRequest(t, "no topics", request, metadataRequestNoTopicsV1) @@ -86,6 +88,7 @@ func TestMetadataRequestV1(t *testing.T) { } func TestMetadataRequestV2(t *testing.T) { + t.Parallel() request := new(MetadataRequest) request.Version = 2 testRequest(t, "no topics", request, metadataRequestNoTopicsV2) @@ -98,6 +101,7 @@ func TestMetadataRequestV2(t *testing.T) { } func TestMetadataRequestV3(t *testing.T) { + t.Parallel() request := new(MetadataRequest) request.Version = 3 testRequest(t, "no topics", request, metadataRequestNoTopicsV3) @@ -110,6 +114,7 @@ func TestMetadataRequestV3(t *testing.T) { } func TestMetadataRequestV4(t *testing.T) { + t.Parallel() request := new(MetadataRequest) request.Version = 4 testRequest(t, "no topics", request, metadataRequestNoTopicsV4) @@ -124,6 +129,7 @@ func TestMetadataRequestV4(t *testing.T) { } func TestMetadataRequestV5(t *testing.T) { + t.Parallel() request := new(MetadataRequest) request.Version = 5 testRequest(t, "no topics", request, metadataRequestNoTopicsV5) diff --git a/metadata_response_test.go b/metadata_response_test.go index 3970a2f31..248d9b05c 100644 --- a/metadata_response_test.go +++ b/metadata_response_test.go @@ -109,6 +109,7 @@ var ( ) func TestEmptyMetadataResponseV0(t *testing.T) { + t.Parallel() response := MetadataResponse{} testVersionDecodable(t, "empty, V0", &response, emptyMetadataResponseV0, 0) @@ -121,6 +122,7 @@ func TestEmptyMetadataResponseV0(t *testing.T) { } func TestMetadataResponseWithBrokersV0(t *testing.T) { + t.Parallel() response := MetadataResponse{} testVersionDecodable(t, "brokers, no topics, V0", &response, brokersNoTopicsMetadataResponseV0, 0) @@ -147,6 +149,7 @@ func TestMetadataResponseWithBrokersV0(t *testing.T) { } func TestMetadataResponseWithTopicsV0(t *testing.T) { + t.Parallel() response := MetadataResponse{} testVersionDecodable(t, "topics, no brokers, V0", &response, topicsNoBrokersMetadataResponseV0, 0) @@ -209,6 +212,7 @@ func TestMetadataResponseWithTopicsV0(t *testing.T) { } func TestMetadataResponseWithBrokersV1(t *testing.T) { + t.Parallel() response := MetadataResponse{} testVersionDecodable(t, "topics, V1", &response, brokersNoTopicsMetadataResponseV1, 1) @@ -230,6 +234,7 @@ func TestMetadataResponseWithBrokersV1(t *testing.T) { } func TestMetadataResponseWithTopicsV1(t *testing.T) { + t.Parallel() response := MetadataResponse{} testVersionDecodable(t, "topics, V1", &response, topicsNoBrokersMetadataResponseV1, 1) @@ -251,6 +256,7 @@ func TestMetadataResponseWithTopicsV1(t *testing.T) { } func TestMetadataResponseWithThrottleTime(t *testing.T) { + t.Parallel() response := MetadataResponse{} testVersionDecodable(t, "no topics, no brokers, throttle time and cluster Id V3", &response, noBrokersNoTopicsWithThrottleTimeAndClusterIDV3, 3) @@ -272,6 +278,7 @@ func TestMetadataResponseWithThrottleTime(t *testing.T) { } func TestMetadataResponseWithOfflineReplicasV5(t *testing.T) { + t.Parallel() response := MetadataResponse{} testVersionDecodable(t, "no brokers, 1 topic with offline replica V5", &response, noBrokersOneTopicWithOfflineReplicasV5, 5) diff --git a/metrics_test.go b/metrics_test.go index 7572f5b90..7491196d7 100644 --- a/metrics_test.go +++ b/metrics_test.go @@ -7,6 +7,7 @@ import ( ) func TestGetOrRegisterHistogram(t *testing.T) { + t.Parallel() metricRegistry := metrics.NewRegistry() histogram := getOrRegisterHistogram("name", metricRegistry) @@ -30,6 +31,7 @@ func TestGetOrRegisterHistogram(t *testing.T) { } func TestGetMetricNameForBroker(t *testing.T) { + t.Parallel() metricName := getMetricNameForBroker("name", &Broker{id: 1}) if metricName != "name-for-broker-1" { diff --git a/mocks/async_producer_test.go b/mocks/async_producer_test.go index 29ed9238f..666c93b05 100644 --- a/mocks/async_producer_test.go +++ b/mocks/async_producer_test.go @@ -36,6 +36,7 @@ func (trm *testReporterMock) Errorf(format string, args ...interface{}) { } func TestMockAsyncProducerImplementsAsyncProducerInterface(t *testing.T) { + t.Parallel() var mp interface{} = &AsyncProducer{} if _, ok := mp.(sarama.AsyncProducer); !ok { t.Error("The mock producer should implement the sarama.Producer interface.") @@ -43,6 +44,7 @@ func TestMockAsyncProducerImplementsAsyncProducerInterface(t *testing.T) { } func TestProducerReturnsExpectationsToChannels(t *testing.T) { + t.Parallel() config := NewTestConfig() config.Producer.Return.Successes = true mp := NewAsyncProducer(t, config). @@ -76,6 +78,7 @@ func TestProducerReturnsExpectationsToChannels(t *testing.T) { } func TestProducerWithTooFewExpectations(t *testing.T) { + t.Parallel() trm := newTestReporterMock() mp := NewAsyncProducer(trm, nil) mp.ExpectInputAndSucceed() @@ -93,6 +96,7 @@ func TestProducerWithTooFewExpectations(t *testing.T) { } func TestProducerWithTooManyExpectations(t *testing.T) { + t.Parallel() trm := newTestReporterMock() mp := NewAsyncProducer(trm, nil). ExpectInputAndSucceed(). @@ -109,6 +113,7 @@ func TestProducerWithTooManyExpectations(t *testing.T) { } func TestProducerWithCheckerFunction(t *testing.T) { + t.Parallel() trm := newTestReporterMock() mp := NewAsyncProducer(trm, nil). ExpectInputWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes")). @@ -131,6 +136,7 @@ func TestProducerWithCheckerFunction(t *testing.T) { } func TestProducerWithBrokenPartitioner(t *testing.T) { + t.Parallel() trm := newTestReporterMock() config := sarama.NewConfig() config.Producer.Partitioner = func(string) sarama.Partitioner { diff --git a/mocks/consumer_test.go b/mocks/consumer_test.go index 2a408af97..2efe183d1 100644 --- a/mocks/consumer_test.go +++ b/mocks/consumer_test.go @@ -8,6 +8,7 @@ import ( ) func TestMockConsumerImplementsConsumerInterface(t *testing.T) { + t.Parallel() var c interface{} = &Consumer{} if _, ok := c.(sarama.Consumer); !ok { t.Error("The mock consumer should implement the sarama.Consumer interface.") @@ -20,6 +21,7 @@ func TestMockConsumerImplementsConsumerInterface(t *testing.T) { } func TestConsumerHandlesExpectations(t *testing.T) { + t.Parallel() consumer := NewConsumer(t, NewTestConfig()) defer func() { if err := consumer.Close(); err != nil { @@ -65,6 +67,7 @@ func TestConsumerHandlesExpectations(t *testing.T) { } func TestConsumerHandlesExpectationsPausingResuming(t *testing.T) { + t.Parallel() consumer := NewConsumer(t, NewTestConfig()) defer func() { if err := consumer.Close(); err != nil { @@ -144,6 +147,7 @@ func TestConsumerHandlesExpectationsPausingResuming(t *testing.T) { } func TestConsumerReturnsNonconsumedErrorsOnClose(t *testing.T) { + t.Parallel() consumer := NewConsumer(t, NewTestConfig()) consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest).YieldError(sarama.ErrOutOfBrokers) consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest).YieldError(sarama.ErrOutOfBrokers) @@ -169,6 +173,7 @@ func TestConsumerReturnsNonconsumedErrorsOnClose(t *testing.T) { } func TestConsumerWithoutExpectationsOnPartition(t *testing.T) { + t.Parallel() trm := newTestReporterMock() consumer := NewConsumer(trm, NewTestConfig()) @@ -187,6 +192,7 @@ func TestConsumerWithoutExpectationsOnPartition(t *testing.T) { } func TestConsumerWithExpectationsOnUnconsumedPartition(t *testing.T) { + t.Parallel() trm := newTestReporterMock() consumer := NewConsumer(trm, NewTestConfig()) consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest).YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello world")}) @@ -201,6 +207,7 @@ func TestConsumerWithExpectationsOnUnconsumedPartition(t *testing.T) { } func TestConsumerWithWrongOffsetExpectation(t *testing.T) { + t.Parallel() trm := newTestReporterMock() consumer := NewConsumer(trm, NewTestConfig()) consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest) @@ -220,6 +227,7 @@ func TestConsumerWithWrongOffsetExpectation(t *testing.T) { } func TestConsumerViolatesMessagesDrainedExpectation(t *testing.T) { + t.Parallel() trm := newTestReporterMock() consumer := NewConsumer(trm, NewTestConfig()) consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest). @@ -245,6 +253,7 @@ func TestConsumerViolatesMessagesDrainedExpectation(t *testing.T) { } func TestConsumerMeetsErrorsDrainedExpectation(t *testing.T) { + t.Parallel() trm := newTestReporterMock() consumer := NewConsumer(trm, NewTestConfig()) @@ -272,6 +281,7 @@ func TestConsumerMeetsErrorsDrainedExpectation(t *testing.T) { } func TestConsumerTopicMetadata(t *testing.T) { + t.Parallel() trm := newTestReporterMock() consumer := NewConsumer(trm, NewTestConfig()) @@ -315,6 +325,7 @@ func TestConsumerTopicMetadata(t *testing.T) { } func TestConsumerUnexpectedTopicMetadata(t *testing.T) { + t.Parallel() trm := newTestReporterMock() consumer := NewConsumer(trm, NewTestConfig()) @@ -328,6 +339,7 @@ func TestConsumerUnexpectedTopicMetadata(t *testing.T) { } func TestConsumerOffsetsAreManagedCorrectlyWithOffsetOldest(t *testing.T) { + t.Parallel() trm := newTestReporterMock() consumer := NewConsumer(trm, NewTestConfig()) pcmock := consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest) @@ -360,6 +372,7 @@ func TestConsumerOffsetsAreManagedCorrectlyWithOffsetOldest(t *testing.T) { } func TestConsumerOffsetsAreManagedCorrectlyWithSpecifiedOffset(t *testing.T) { + t.Parallel() startingOffset := int64(123) trm := newTestReporterMock() consumer := NewConsumer(trm, NewTestConfig()) diff --git a/mocks/sync_producer_test.go b/mocks/sync_producer_test.go index ca1f65f5d..00b876394 100644 --- a/mocks/sync_producer_test.go +++ b/mocks/sync_producer_test.go @@ -9,6 +9,7 @@ import ( ) func TestMockSyncProducerImplementsSyncProducerInterface(t *testing.T) { + t.Parallel() var mp interface{} = &SyncProducer{} if _, ok := mp.(sarama.SyncProducer); !ok { t.Error("The mock async producer should implement the sarama.SyncProducer interface.") @@ -16,6 +17,7 @@ func TestMockSyncProducerImplementsSyncProducerInterface(t *testing.T) { } func TestSyncProducerReturnsExpectationsToSendMessage(t *testing.T) { + t.Parallel() sp := NewSyncProducer(t, nil) defer func() { if err := sp.Close(); err != nil { @@ -56,6 +58,7 @@ func TestSyncProducerReturnsExpectationsToSendMessage(t *testing.T) { } func TestSyncProducerWithTooManyExpectations(t *testing.T) { + t.Parallel() trm := newTestReporterMock() sp := NewSyncProducer(trm, nil). @@ -77,6 +80,7 @@ func TestSyncProducerWithTooManyExpectations(t *testing.T) { } func TestSyncProducerWithTooFewExpectations(t *testing.T) { + t.Parallel() trm := newTestReporterMock() sp := NewSyncProducer(trm, nil).ExpectSendMessageAndSucceed() @@ -99,6 +103,7 @@ func TestSyncProducerWithTooFewExpectations(t *testing.T) { } func TestSyncProducerWithCheckerFunction(t *testing.T) { + t.Parallel() trm := newTestReporterMock() sp := NewSyncProducer(trm, nil). @@ -124,6 +129,7 @@ func TestSyncProducerWithCheckerFunction(t *testing.T) { } func TestSyncProducerWithCheckerFunctionForSendMessagesWithError(t *testing.T) { + t.Parallel() trm := newTestReporterMock() sp := NewSyncProducer(trm, nil). @@ -148,6 +154,7 @@ func TestSyncProducerWithCheckerFunctionForSendMessagesWithError(t *testing.T) { } func TestSyncProducerWithCheckerFunctionForSendMessagesWithoutError(t *testing.T) { + t.Parallel() trm := newTestReporterMock() sp := NewSyncProducer(trm, nil). @@ -177,6 +184,7 @@ func TestSyncProducerWithCheckerFunctionForSendMessagesWithoutError(t *testing.T } func TestSyncProducerSendMessagesExpectationsMismatchTooFew(t *testing.T) { + t.Parallel() trm := newTestReporterMock() sp := NewSyncProducer(trm, nil). @@ -201,6 +209,7 @@ func TestSyncProducerSendMessagesExpectationsMismatchTooFew(t *testing.T) { } func TestSyncProducerSendMessagesExpectationsMismatchTooMany(t *testing.T) { + t.Parallel() trm := newTestReporterMock() sp := NewSyncProducer(trm, nil). @@ -224,6 +233,7 @@ func TestSyncProducerSendMessagesExpectationsMismatchTooMany(t *testing.T) { } func TestSyncProducerSendMessagesFaultyEncoder(t *testing.T) { + t.Parallel() trm := newTestReporterMock() sp := NewSyncProducer(trm, nil). diff --git a/offset_commit_request_test.go b/offset_commit_request_test.go index 06bbd4036..8d6ba003b 100644 --- a/offset_commit_request_test.go +++ b/offset_commit_request_test.go @@ -64,6 +64,7 @@ var ( ) func TestOffsetCommitRequestV0(t *testing.T) { + t.Parallel() request := new(OffsetCommitRequest) request.Version = 0 request.ConsumerGroup = "foobar" @@ -74,6 +75,7 @@ func TestOffsetCommitRequestV0(t *testing.T) { } func TestOffsetCommitRequestV1(t *testing.T) { + t.Parallel() request := new(OffsetCommitRequest) request.ConsumerGroup = "foobar" request.ConsumerID = "cons" @@ -86,6 +88,7 @@ func TestOffsetCommitRequestV1(t *testing.T) { } func TestOffsetCommitRequestV2ToV4(t *testing.T) { + t.Parallel() for version := 2; version <= 4; version++ { request := new(OffsetCommitRequest) request.ConsumerGroup = "foobar" diff --git a/offset_commit_response_test.go b/offset_commit_response_test.go index f35ca54ec..7e0bb295c 100644 --- a/offset_commit_response_test.go +++ b/offset_commit_response_test.go @@ -10,11 +10,13 @@ var emptyOffsetCommitResponse = []byte{ } func TestEmptyOffsetCommitResponse(t *testing.T) { + t.Parallel() response := OffsetCommitResponse{} testResponse(t, "empty", &response, emptyOffsetCommitResponse) } func TestNormalOffsetCommitResponse(t *testing.T) { + t.Parallel() response := OffsetCommitResponse{} response.AddError("t", 0, ErrNotLeaderForPartition) response.Errors["m"] = make(map[int32]KError) @@ -24,6 +26,7 @@ func TestNormalOffsetCommitResponse(t *testing.T) { } func TestOffsetCommitResponseWithThrottleTime(t *testing.T) { + t.Parallel() for version := 3; version <= 4; version++ { response := OffsetCommitResponse{ Version: int16(version), diff --git a/offset_fetch_request_test.go b/offset_fetch_request_test.go index d4497a8b8..6657b05e5 100644 --- a/offset_fetch_request_test.go +++ b/offset_fetch_request_test.go @@ -55,6 +55,7 @@ var ( ) func TestOffsetFetchRequestNoPartitions(t *testing.T) { + t.Parallel() for version := 0; version <= 5; version++ { request := new(OffsetFetchRequest) request.Version = int16(version) @@ -88,6 +89,7 @@ func TestOffsetFetchRequestNoPartitions(t *testing.T) { } func TestOffsetFetchRequest(t *testing.T) { + t.Parallel() for version := 0; version <= 5; version++ { request := new(OffsetFetchRequest) request.Version = int16(version) @@ -116,6 +118,7 @@ func TestOffsetFetchRequest(t *testing.T) { } func TestOffsetFetchRequestAllPartitions(t *testing.T) { + t.Parallel() for version := 2; version <= 5; version++ { request := &OffsetFetchRequest{Version: int16(version), ConsumerGroup: "blah"} testRequest(t, fmt.Sprintf("all partitions %d", version), request, offsetFetchRequestAllPartitions) diff --git a/offset_fetch_response_test.go b/offset_fetch_response_test.go index d70894ab2..2801a46d9 100644 --- a/offset_fetch_response_test.go +++ b/offset_fetch_response_test.go @@ -23,6 +23,7 @@ var ( ) func TestEmptyOffsetFetchResponse(t *testing.T) { + t.Parallel() for version := 0; version <= 1; version++ { response := OffsetFetchResponse{Version: int16(version)} testResponse(t, fmt.Sprintf("empty v%d", version), &response, emptyOffsetFetchResponse) @@ -41,6 +42,7 @@ func TestNormalOffsetFetchResponse(t *testing.T) { // The response encoded form cannot be checked for it varies due to // unpredictable map traversal order. // Hence the 'nil' as byte[] parameter in the 'testResponse(..)' calls + t.Parallel() for version := 0; version <= 1; version++ { response := OffsetFetchResponse{Version: int16(version)} diff --git a/offset_manager_test.go b/offset_manager_test.go index 249e53444..36ea0a0d2 100644 --- a/offset_manager_test.go +++ b/offset_manager_test.go @@ -72,6 +72,7 @@ func initPartitionOffsetManager(t *testing.T, om OffsetManager, } func TestNewOffsetManager(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) seedBroker.Returns(new(MetadataResponse)) defer seedBroker.Close() @@ -117,9 +118,12 @@ var offsetsautocommitTestTable = []struct { } func TestNewOffsetManagerOffsetsAutoCommit(t *testing.T) { + t.Parallel() // Tests to validate configuration of `Consumer.Offsets.AutoCommit.Enable` for _, tt := range offsetsautocommitTestTable { + tt := tt t.Run(tt.name, func(t *testing.T) { + t.Parallel() config := NewTestConfig() if tt.set { config.Consumer.Offsets.AutoCommit.Enable = tt.enable @@ -170,6 +174,7 @@ func TestNewOffsetManagerOffsetsAutoCommit(t *testing.T) { } func TestNewOffsetManagerOffsetsManualCommit(t *testing.T) { + t.Parallel() // Tests to validate configuration when `Consumer.Offsets.AutoCommit.Enable` is false config := NewTestConfig() config.Consumer.Offsets.AutoCommit.Enable = false @@ -230,6 +235,7 @@ func TestNewOffsetManagerOffsetsManualCommit(t *testing.T) { // Test recovery from ErrNotCoordinatorForConsumer // on first fetchInitialOffset call func TestOffsetManagerFetchInitialFail(t *testing.T) { + t.Parallel() om, testClient, broker, coordinator := initOffsetManager(t, 0) // Error on first fetchInitialOffset call @@ -273,6 +279,7 @@ func TestOffsetManagerFetchInitialFail(t *testing.T) { // Test fetchInitialOffset retry on ErrOffsetsLoadInProgress func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) { + t.Parallel() retryCount := int32(0) backoff := func(retries, maxRetries int) time.Duration { atomic.AddInt32(&retryCount, 1) @@ -315,6 +322,7 @@ func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) { } func TestPartitionOffsetManagerInitialOffset(t *testing.T) { + t.Parallel() om, testClient, broker, coordinator := initOffsetManager(t, 0) testClient.Config().Consumer.Offsets.Initial = OffsetOldest @@ -337,6 +345,7 @@ func TestPartitionOffsetManagerInitialOffset(t *testing.T) { } func TestPartitionOffsetManagerNextOffset(t *testing.T) { + t.Parallel() om, testClient, broker, coordinator := initOffsetManager(t, 0) pom := initPartitionOffsetManager(t, om, coordinator, 5, "test_meta") @@ -356,6 +365,7 @@ func TestPartitionOffsetManagerNextOffset(t *testing.T) { } func TestPartitionOffsetManagerResetOffset(t *testing.T) { + t.Parallel() om, testClient, broker, coordinator := initOffsetManager(t, 0) pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta") @@ -382,6 +392,7 @@ func TestPartitionOffsetManagerResetOffset(t *testing.T) { } func TestPartitionOffsetManagerResetOffsetWithRetention(t *testing.T) { + t.Parallel() om, testClient, broker, coordinator := initOffsetManager(t, time.Hour) pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta") @@ -418,6 +429,7 @@ func TestPartitionOffsetManagerResetOffsetWithRetention(t *testing.T) { } func TestPartitionOffsetManagerMarkOffset(t *testing.T) { + t.Parallel() om, testClient, broker, coordinator := initOffsetManager(t, 0) pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta") @@ -443,6 +455,7 @@ func TestPartitionOffsetManagerMarkOffset(t *testing.T) { } func TestPartitionOffsetManagerMarkOffsetWithRetention(t *testing.T) { + t.Parallel() om, testClient, broker, coordinator := initOffsetManager(t, time.Hour) pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta") @@ -478,6 +491,7 @@ func TestPartitionOffsetManagerMarkOffsetWithRetention(t *testing.T) { } func TestPartitionOffsetManagerCommitErr(t *testing.T) { + t.Parallel() om, testClient, broker, coordinator := initOffsetManager(t, 0) pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta") @@ -542,6 +556,7 @@ func TestPartitionOffsetManagerCommitErr(t *testing.T) { // Test of recovery from abort func TestAbortPartitionOffsetManager(t *testing.T) { + t.Parallel() om, testClient, broker, coordinator := initOffsetManager(t, 0) pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta") diff --git a/offset_request_test.go b/offset_request_test.go index 8ea533a6b..57dd5f7c7 100644 --- a/offset_request_test.go +++ b/offset_request_test.go @@ -49,6 +49,7 @@ var ( ) func TestOffsetRequest(t *testing.T) { + t.Parallel() request := new(OffsetRequest) testRequest(t, "no blocks", request, offsetRequestNoBlocksV1) @@ -57,6 +58,7 @@ func TestOffsetRequest(t *testing.T) { } func TestOffsetRequestV1(t *testing.T) { + t.Parallel() request := new(OffsetRequest) request.Version = 1 testRequest(t, "no blocks", request, offsetRequestNoBlocksV1) @@ -66,6 +68,7 @@ func TestOffsetRequestV1(t *testing.T) { } func TestOffsetRequestV2(t *testing.T) { + t.Parallel() request := new(OffsetRequest) request.Version = 2 testRequest(t, "no blocks", request, offsetRequestNoBlocksV2) @@ -76,6 +79,7 @@ func TestOffsetRequestV2(t *testing.T) { } func TestOffsetRequestReplicaID(t *testing.T) { + t.Parallel() request := new(OffsetRequest) replicaID := int32(42) request.SetReplicaID(replicaID) diff --git a/offset_response_test.go b/offset_response_test.go index 683c11e43..8dcb3bfdd 100644 --- a/offset_response_test.go +++ b/offset_response_test.go @@ -38,6 +38,7 @@ var ( ) func TestEmptyOffsetResponse(t *testing.T) { + t.Parallel() response := OffsetResponse{} testVersionDecodable(t, "empty", &response, emptyOffsetResponse, 0) @@ -54,6 +55,7 @@ func TestEmptyOffsetResponse(t *testing.T) { } func TestNormalOffsetResponse(t *testing.T) { + t.Parallel() response := OffsetResponse{} testVersionDecodable(t, "normal", &response, normalOffsetResponse, 0) @@ -84,6 +86,7 @@ func TestNormalOffsetResponse(t *testing.T) { } func TestNormalOffsetResponseV1(t *testing.T) { + t.Parallel() response := OffsetResponse{} testVersionDecodable(t, "normal", &response, normalOffsetResponseV1, 1) diff --git a/partitioner_test.go b/partitioner_test.go index 06a4ad725..230f92c26 100644 --- a/partitioner_test.go +++ b/partitioner_test.go @@ -27,6 +27,7 @@ func assertPartitioningConsistent(t *testing.T, partitioner Partitioner, message } func TestRandomPartitioner(t *testing.T) { + t.Parallel() partitioner := NewRandomPartitioner("mytopic") choice, err := partitioner.Partition(nil, 1) @@ -49,6 +50,7 @@ func TestRandomPartitioner(t *testing.T) { } func TestRoundRobinPartitioner(t *testing.T) { + t.Parallel() partitioner := NewRoundRobinPartitioner("mytopic") choice, err := partitioner.Partition(nil, 1) @@ -72,6 +74,7 @@ func TestRoundRobinPartitioner(t *testing.T) { } func TestNewHashPartitionerWithHasher(t *testing.T) { + t.Parallel() // use the current default hasher fnv.New32a() partitioner := NewCustomHashPartitioner(fnv.New32a)("mytopic") @@ -103,6 +106,7 @@ func TestNewHashPartitionerWithHasher(t *testing.T) { } func TestHashPartitionerWithHasherMinInt32(t *testing.T) { + t.Parallel() // use the current default hasher fnv.New32a() partitioner := NewCustomHashPartitioner(fnv.New32a)("mytopic") @@ -121,6 +125,7 @@ func TestHashPartitionerWithHasherMinInt32(t *testing.T) { } func TestHashPartitioner(t *testing.T) { + t.Parallel() partitioner := NewHashPartitioner("mytopic") choice, err := partitioner.Partition(&ProducerMessage{}, 1) @@ -151,6 +156,7 @@ func TestHashPartitioner(t *testing.T) { } func TestHashPartitionerConsistency(t *testing.T) { + t.Parallel() partitioner := NewHashPartitioner("mytopic") ep, ok := partitioner.(DynamicConsistencyPartitioner) @@ -169,6 +175,7 @@ func TestHashPartitionerConsistency(t *testing.T) { } func TestHashPartitionerMinInt32(t *testing.T) { + t.Parallel() partitioner := NewHashPartitioner("mytopic") msg := ProducerMessage{} @@ -186,6 +193,7 @@ func TestHashPartitionerMinInt32(t *testing.T) { } func TestManualPartitioner(t *testing.T) { + t.Parallel() partitioner := NewManualPartitioner("mytopic") choice, err := partitioner.Partition(&ProducerMessage{}, 1) @@ -208,6 +216,7 @@ func TestManualPartitioner(t *testing.T) { } func TestWithCustomFallbackPartitioner(t *testing.T) { + t.Parallel() topic := "mytopic" partitioner := NewCustomPartitioner( diff --git a/produce_request_test.go b/produce_request_test.go index e8684ffd2..628b7f905 100644 --- a/produce_request_test.go +++ b/produce_request_test.go @@ -74,6 +74,7 @@ var ( ) func TestProduceRequest(t *testing.T) { + t.Parallel() request := new(ProduceRequest) testRequest(t, "empty", request, produceRequestEmpty) diff --git a/produce_response_test.go b/produce_response_test.go index 0509bb569..b989e5a08 100644 --- a/produce_response_test.go +++ b/produce_response_test.go @@ -66,6 +66,7 @@ var ( ) func TestProduceResponseDecode(t *testing.T) { + t.Parallel() response := ProduceResponse{} testVersionDecodable(t, "no blocks", &response, produceResponseNoBlocksV0, 0) @@ -112,6 +113,7 @@ func TestProduceResponseDecode(t *testing.T) { } func TestProduceResponseEncode(t *testing.T) { + t.Parallel() response := ProduceResponse{} response.Blocks = make(map[string]map[int32]*ProduceResponseBlock) testEncodable(t, "empty", &response, produceResponseNoBlocksV0) @@ -131,6 +133,7 @@ func TestProduceResponseEncode(t *testing.T) { } func TestProduceResponseEncodeInvalidTimestamp(t *testing.T) { + t.Parallel() response := ProduceResponse{} response.Version = 2 response.Blocks = make(map[string]map[int32]*ProduceResponseBlock) diff --git a/produce_set_test.go b/produce_set_test.go index 652d6d7d9..2dede467f 100644 --- a/produce_set_test.go +++ b/produce_set_test.go @@ -23,6 +23,7 @@ func safeAddMessage(t *testing.T, ps *produceSet, msg *ProducerMessage) { } func TestProduceSetInitial(t *testing.T) { + t.Parallel() _, ps := makeProduceSet() if !ps.empty() { @@ -35,6 +36,7 @@ func TestProduceSetInitial(t *testing.T) { } func TestProduceSetAddingMessages(t *testing.T) { + t.Parallel() _, ps := makeProduceSet() msg := &ProducerMessage{Key: StringEncoder(TestMessage), Value: StringEncoder(TestMessage)} @@ -50,6 +52,7 @@ func TestProduceSetAddingMessages(t *testing.T) { } func TestProduceSetAddingMessagesOverflowMessagesLimit(t *testing.T) { + t.Parallel() parent, ps := makeProduceSet() parent.conf.Producer.Flush.MaxMessages = 1000 @@ -68,6 +71,7 @@ func TestProduceSetAddingMessagesOverflowMessagesLimit(t *testing.T) { } func TestProduceSetAddingMessagesOverflowBytesLimit(t *testing.T) { + t.Parallel() parent, ps := makeProduceSet() parent.conf.Producer.MaxMessageBytes = 1000 @@ -86,6 +90,7 @@ func TestProduceSetAddingMessagesOverflowBytesLimit(t *testing.T) { } func TestProduceSetPartitionTracking(t *testing.T) { + t.Parallel() _, ps := makeProduceSet() m1 := &ProducerMessage{Topic: "t1", Partition: 0} @@ -133,6 +138,7 @@ func TestProduceSetPartitionTracking(t *testing.T) { } func TestProduceSetRequestBuilding(t *testing.T) { + t.Parallel() parent, ps := makeProduceSet() parent.conf.Producer.RequiredAcks = WaitForAll parent.conf.Producer.Timeout = 10 * time.Second @@ -171,6 +177,7 @@ func TestProduceSetRequestBuilding(t *testing.T) { } func TestProduceSetCompressedRequestBuilding(t *testing.T) { + t.Parallel() parent, ps := makeProduceSet() parent.conf.Producer.RequiredAcks = WaitForAll parent.conf.Producer.Timeout = 10 * time.Second @@ -216,6 +223,7 @@ func TestProduceSetCompressedRequestBuilding(t *testing.T) { } func TestProduceSetV3RequestBuilding(t *testing.T) { + t.Parallel() parent, ps := makeProduceSet() parent.conf.Producer.RequiredAcks = WaitForAll parent.conf.Producer.Timeout = 10 * time.Second @@ -282,6 +290,7 @@ func TestProduceSetV3RequestBuilding(t *testing.T) { } func TestProduceSetIdempotentRequestBuilding(t *testing.T) { + t.Parallel() const pID = 1000 const pEpoch = 1234 @@ -370,6 +379,7 @@ func TestProduceSetIdempotentRequestBuilding(t *testing.T) { } func TestProduceSetConsistentTimestamps(t *testing.T) { + t.Parallel() parent, ps1 := makeProduceSet() ps2 := newProduceSet(parent) parent.conf.Producer.RequiredAcks = WaitForAll diff --git a/record_test.go b/record_test.go index 1aceeda2c..f5ef2d22d 100644 --- a/record_test.go +++ b/record_test.go @@ -263,6 +263,7 @@ func isOldGo(t *testing.T) bool { } func TestRecordBatchEncoding(t *testing.T) { + t.Parallel() for _, tc := range recordBatchTestCases { if tc.oldGoEncoded != nil && isOldGo(t) { testEncodable(t, tc.name, &tc.batch, tc.oldGoEncoded) @@ -273,6 +274,7 @@ func TestRecordBatchEncoding(t *testing.T) { } func TestRecordBatchDecoding(t *testing.T) { + t.Parallel() for _, tc := range recordBatchTestCases { batch := RecordBatch{} testDecodable(t, tc.name, &batch, tc.encoded) diff --git a/records_test.go b/records_test.go index 34f1b4a6e..fd985f56c 100644 --- a/records_test.go +++ b/records_test.go @@ -7,6 +7,7 @@ import ( ) func TestLegacyRecords(t *testing.T) { + t.Parallel() set := &MessageSet{ Messages: []*MessageBlock{ { @@ -82,6 +83,7 @@ func TestLegacyRecords(t *testing.T) { } func TestDefaultRecords(t *testing.T) { + t.Parallel() batch := &RecordBatch{ IsTransactional: true, Version: 2, diff --git a/response_header_test.go b/response_header_test.go index c7c68eae7..8a5daae1a 100644 --- a/response_header_test.go +++ b/response_header_test.go @@ -15,6 +15,7 @@ var ( ) func TestResponseHeaderV0(t *testing.T) { + t.Parallel() header := responseHeader{} testVersionDecodable(t, "response header", &header, responseHeaderBytesV0, 0) @@ -27,6 +28,7 @@ func TestResponseHeaderV0(t *testing.T) { } func TestResponseHeaderV1(t *testing.T) { + t.Parallel() header := responseHeader{} testVersionDecodable(t, "response header", &header, responseHeaderBytesV1, 1) diff --git a/sasl_authenticate_request_test.go b/sasl_authenticate_request_test.go index bf75004d2..ba3fe47e0 100644 --- a/sasl_authenticate_request_test.go +++ b/sasl_authenticate_request_test.go @@ -7,6 +7,7 @@ var saslAuthenticateRequest = []byte{ } func TestSaslAuthenticateRequest(t *testing.T) { + t.Parallel() request := new(SaslAuthenticateRequest) request.SaslAuthBytes = []byte(`foo`) testRequest(t, "basic", request, saslAuthenticateRequest) diff --git a/sasl_authenticate_response_test.go b/sasl_authenticate_response_test.go index 048dade19..018bdd35a 100644 --- a/sasl_authenticate_response_test.go +++ b/sasl_authenticate_response_test.go @@ -9,6 +9,7 @@ var saslAuthenticatResponseErr = []byte{ } func TestSaslAuthenticateResponse(t *testing.T) { + t.Parallel() response := new(SaslAuthenticateResponse) response.Err = ErrSASLAuthenticationFailed msg := "err" diff --git a/sasl_handshake_request_test.go b/sasl_handshake_request_test.go index e100ad5b9..5940c2471 100644 --- a/sasl_handshake_request_test.go +++ b/sasl_handshake_request_test.go @@ -7,6 +7,7 @@ var baseSaslRequest = []byte{ } func TestSaslHandshakeRequest(t *testing.T) { + t.Parallel() request := new(SaslHandshakeRequest) request.Mechanism = "foo" testRequest(t, "basic", request, baseSaslRequest) diff --git a/sasl_handshake_response_test.go b/sasl_handshake_response_test.go index 40441fd85..780bcf577 100644 --- a/sasl_handshake_response_test.go +++ b/sasl_handshake_response_test.go @@ -9,6 +9,7 @@ var saslHandshakeResponse = []byte{ } func TestSaslHandshakeResponse(t *testing.T) { + t.Parallel() response := new(SaslHandshakeResponse) testVersionDecodable(t, "no error", response, saslHandshakeResponse, 0) if response.Err != ErrNoError { diff --git a/scram_formatter_test.go b/scram_formatter_test.go index b673a6a7d..7c9f43bb3 100644 --- a/scram_formatter_test.go +++ b/scram_formatter_test.go @@ -36,6 +36,7 @@ public class App { */ func TestScramSaltedPasswordSha512(t *testing.T) { + t.Parallel() password := []byte("hello") salt := []byte("world") @@ -60,6 +61,7 @@ func TestScramSaltedPasswordSha512(t *testing.T) { } func TestScramSaltedPasswordSha256(t *testing.T) { + t.Parallel() password := []byte("hello") salt := []byte("world") diff --git a/sticky_assignor_user_data_test.go b/sticky_assignor_user_data_test.go index 9eb09a615..bca39be78 100644 --- a/sticky_assignor_user_data_test.go +++ b/sticky_assignor_user_data_test.go @@ -6,6 +6,7 @@ import ( ) func TestStickyAssignorUserDataV0(t *testing.T) { + t.Parallel() // Single topic with deterministic ordering across encode-decode req := &StickyAssignorUserDataV0{} data := decodeUserDataBytes(t, "AAAAAQADdDAzAAAAAQAAAAU=") @@ -24,6 +25,7 @@ func TestStickyAssignorUserDataV0(t *testing.T) { } func TestStickyAssignorUserDataV1(t *testing.T) { + t.Parallel() // Single topic with deterministic ordering across encode-decode req := &StickyAssignorUserDataV1{} data := decodeUserDataBytes(t, "AAAAAQADdDA2AAAAAgAAAAAAAAAE/////w==") diff --git a/sync_group_request_test.go b/sync_group_request_test.go index 3f537ef9f..f03ae5f83 100644 --- a/sync_group_request_test.go +++ b/sync_group_request_test.go @@ -21,6 +21,7 @@ var ( ) func TestSyncGroupRequest(t *testing.T) { + t.Parallel() var request *SyncGroupRequest request = new(SyncGroupRequest) diff --git a/sync_group_response_test.go b/sync_group_response_test.go index 6fb708858..85faec4ad 100644 --- a/sync_group_response_test.go +++ b/sync_group_response_test.go @@ -18,6 +18,7 @@ var ( ) func TestSyncGroupResponse(t *testing.T) { + t.Parallel() var response *SyncGroupResponse response = new(SyncGroupResponse) diff --git a/sync_producer_test.go b/sync_producer_test.go index 5f14f7be0..b59d872c8 100644 --- a/sync_producer_test.go +++ b/sync_producer_test.go @@ -7,6 +7,7 @@ import ( ) func TestSyncProducer(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) @@ -57,6 +58,7 @@ func TestSyncProducer(t *testing.T) { } func TestSyncProducerBatch(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) @@ -105,6 +107,7 @@ func TestSyncProducerBatch(t *testing.T) { } func TestConcurrentSyncProducer(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) @@ -149,6 +152,7 @@ func TestConcurrentSyncProducer(t *testing.T) { } func TestSyncProducerToNonExistingTopic(t *testing.T) { + t.Parallel() broker := NewMockBroker(t, 1) metadataResponse := new(MetadataResponse) @@ -180,6 +184,7 @@ func TestSyncProducerToNonExistingTopic(t *testing.T) { } func TestSyncProducerRecoveryWithRetriesDisabled(t *testing.T) { + t.Parallel() seedBroker := NewMockBroker(t, 1) leader1 := NewMockBroker(t, 2) leader2 := NewMockBroker(t, 3) diff --git a/txn_offset_commit_request_test.go b/txn_offset_commit_request_test.go index 714ec47eb..76f208ca6 100644 --- a/txn_offset_commit_request_test.go +++ b/txn_offset_commit_request_test.go @@ -16,6 +16,7 @@ var txnOffsetCommitRequest = []byte{ } func TestTxnOffsetCommitRequest(t *testing.T) { + t.Parallel() req := &TxnOffsetCommitRequest{ TransactionalID: "txn", GroupID: "groupid", diff --git a/txn_offset_commit_response_test.go b/txn_offset_commit_response_test.go index b4caa69b9..4f152fb77 100644 --- a/txn_offset_commit_response_test.go +++ b/txn_offset_commit_response_test.go @@ -15,6 +15,7 @@ var txnOffsetCommitResponse = []byte{ } func TestTxnOffsetCommitResponse(t *testing.T) { + t.Parallel() resp := &TxnOffsetCommitResponse{ ThrottleTime: 100 * time.Millisecond, Topics: map[string][]*PartitionError{ diff --git a/utils_test.go b/utils_test.go index 3d2c3ac89..a7448d07c 100644 --- a/utils_test.go +++ b/utils_test.go @@ -3,6 +3,7 @@ package sarama import "testing" func TestVersionCompare(t *testing.T) { + t.Parallel() if V0_8_2_0.IsAtLeast(V0_8_2_1) { t.Error("0.8.2.0 >= 0.8.2.1") } @@ -27,6 +28,7 @@ func TestVersionCompare(t *testing.T) { } func TestVersionParsing(t *testing.T) { + t.Parallel() validVersions := []string{ "0.8.2.0", "0.8.2.1",