Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions pkg/odp/event/event_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func getRetryInterval(retryCount int) time.Duration {
type Manager interface {
// odpConfig is required here since it can be updated anytime and ticker needs to be aware of latest changes
Start(ctx context.Context, odpConfig config.Config)
IdentifyUser(apiKey, apiHost, userID string)
IdentifyUser(apiKey, apiHost string, identifiers map[string]string)
ProcessEvent(apiKey, apiHost string, odpEvent Event) error
FlushEvents(apiKey, apiHost string)
}
Expand Down Expand Up @@ -162,16 +162,31 @@ func (bm *BatchEventManager) Start(ctx context.Context, odpConfig config.Config)
}

// IdentifyUser associates a full-stack userid with an established VUID
func (bm *BatchEventManager) IdentifyUser(apiKey, apiHost, userID string) {
func (bm *BatchEventManager) IdentifyUser(apiKey, apiHost string, identifiers map[string]string) {
if !bm.IsOdpServiceIntegrated(apiKey, apiHost) {
bm.logger.Debug(utils.IdentityOdpNotIntegrated)
return
}
identifiers := map[string]string{utils.OdpFSUserIDKey: userID}

// Filter out empty identifier values
validIdentifiers := make(map[string]string)
for k, v := range identifiers {
if v != "" {
validIdentifiers[k] = v
}
}

// Identify requires 2+ identifiers to link (e.g., vuid + fs_user_id).
// A single identifier has no cross-reference value and generates unnecessary traffic.
if len(validIdentifiers) < 2 {
bm.logger.Debug("ODP identify event is not dispatched (fewer than 2 valid identifiers).")
return
}

odpEvent := Event{
Type: utils.OdpEventType,
Action: utils.OdpActionIdentified,
Identifiers: identifiers,
Identifiers: validIdentifiers,
}
_ = bm.ProcessEvent(apiKey, apiHost, odpEvent)
}
Expand Down
28 changes: 22 additions & 6 deletions pkg/odp/event/event_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,18 +178,19 @@ func (e *EventManagerTestSuite) TestEventsDispatchedWhenFlushIntervalReached() {
}

func (e *EventManagerTestSuite) TestIdentifyUserWhenODPNotIntegrated() {
e.eventManager.IdentifyUser("", "1", "123")
identifiers := map[string]string{utils.OdpFSUserIDKey: "123", "vuid": "vuid-123"}
e.eventManager.IdentifyUser("", "1", identifiers)
e.Nil(e.eventManager.ticker)
e.Equal(0, e.eventAPIManager.timesSendEventsCalled)
}

func (e *EventManagerTestSuite) TestIdentifyUserWhenODPIntegrated() {
userID := "123"
expectedEvent := Event{Identifiers: map[string]string{utils.OdpFSUserIDKey: userID}, Type: utils.OdpEventType, Action: utils.OdpActionIdentified}
func (e *EventManagerTestSuite) TestIdentifyUserWhenODPIntegratedWithTwoIdentifiers() {
identifiers := map[string]string{utils.OdpFSUserIDKey: "123", "vuid": "vuid-456"}
expectedEvent := Event{Identifiers: identifiers, Type: utils.OdpEventType, Action: utils.OdpActionIdentified}
e.eventManager.addCommonData(&expectedEvent)
e.eventAPIManager.wg.Add(1)
e.eventManager.batchSize = 1
e.eventManager.IdentifyUser("1", "2", userID)
e.eventManager.IdentifyUser("1", "2", identifiers)
e.eventAPIManager.wg.Wait()
e.Equal(1, e.eventAPIManager.timesSendEventsCalled)

Expand All @@ -200,6 +201,20 @@ func (e *EventManagerTestSuite) TestIdentifyUserWhenODPIntegrated() {
e.Equal(expectedEvent, actualEvent)
}

func (e *EventManagerTestSuite) TestIdentifyUserSkippedWithSingleIdentifier() {
identifiers := map[string]string{utils.OdpFSUserIDKey: "123"}
e.eventManager.IdentifyUser("1", "2", identifiers)
e.Equal(0, e.eventAPIManager.timesSendEventsCalled)
e.Equal(0, e.eventManager.eventQueue.Size())
}

func (e *EventManagerTestSuite) TestIdentifyUserSkippedWithEmptyValues() {
identifiers := map[string]string{utils.OdpFSUserIDKey: "123", "vuid": ""}
e.eventManager.IdentifyUser("1", "2", identifiers)
e.Equal(0, e.eventAPIManager.timesSendEventsCalled)
e.Equal(0, e.eventManager.eventQueue.Size())
}

func (e *EventManagerTestSuite) TestProcessEventWithInvalidODPConfig() {
em := NewBatchEventManager(WithAPIManager(&MockEventAPIManager{}))
e.Error(em.ProcessEvent("", "", Event{Action: "123"}))
Expand Down Expand Up @@ -442,7 +457,8 @@ func (e *EventManagerTestSuite) TestEventManagerAsyncBehaviour() {
eventAPIManager.shouldNotInformWaitgroup = true
eg := newExecutionContext()
callAllMethods := func(id string) {
eventManager.IdentifyUser("-1", "-1", id)
identifiers := map[string]string{utils.OdpFSUserIDKey: id, "vuid": "vuid-" + id}
eventManager.IdentifyUser("-1", "-1", identifiers)
eventManager.ProcessEvent("-1", "-1", Event{Action: "123"})
}
for i := 0; i < iterations; i++ {
Expand Down
10 changes: 9 additions & 1 deletion pkg/odp/odp_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ type Manager interface {
Update(apiKey, apiHost string, segmentsToCheck []string)
}

// identifyUserIdentifiers builds the identifiers map for an identify event.
// Server-side SDKs only have fs_user_id (no VUID), so identify events
// will be skipped by the event manager's count check (requires 2+ identifiers).
func identifyUserIdentifiers(userID string) map[string]string {
return map[string]string{utils.OdpFSUserIDKey: userID}
}

// DefaultOdpManager represents default implementation of odp manager
type DefaultOdpManager struct {
enabled bool
Expand Down Expand Up @@ -141,7 +148,8 @@ func (om *DefaultOdpManager) IdentifyUser(userID string) {
om.logger.Debug(utils.IdentityOdpDisabled)
return
}
om.EventManager.IdentifyUser(om.OdpConfig.GetAPIKey(), om.OdpConfig.GetAPIHost(), userID)
identifiers := identifyUserIdentifiers(userID)
om.EventManager.IdentifyUser(om.OdpConfig.GetAPIKey(), om.OdpConfig.GetAPIHost(), identifiers)
}

// SendOdpEvent sends an event to the ODP server.
Expand Down
7 changes: 4 additions & 3 deletions pkg/odp/odp_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ func (m *MockEventManager) Start(ctx context.Context, odpConfig config.Config) {
m.Called(ctx, odpConfig)
}

func (m *MockEventManager) IdentifyUser(apiKey, apiHost, userID string) {
m.Called(apiKey, apiHost, userID)
func (m *MockEventManager) IdentifyUser(apiKey, apiHost string, identifiers map[string]string) {
m.Called(apiKey, apiHost, identifiers)
}

func (m *MockEventManager) ProcessEvent(apiKey, apiHost string, odpEvent event.Event) error {
Expand Down Expand Up @@ -192,7 +192,8 @@ func (o *ODPManagerTestSuite) TestFetchQualifiedSegments() {
func (o *ODPManagerTestSuite) TestIdentifyUser() {
o.config.On("GetAPIKey").Return("")
o.config.On("GetAPIHost").Return("")
o.eventManager.On("IdentifyUser", "", "", o.userID)
expectedIdentifiers := map[string]string{utils.OdpFSUserIDKey: o.userID}
o.eventManager.On("IdentifyUser", "", "", expectedIdentifiers)
o.odpManager.IdentifyUser(o.userID)
o.segmentManager.AssertExpectations(o.T())
}
Expand Down
Loading