Bug 530681 - Free up memory occupied by command when using asynchronous command propagation for cluster coordination

Signed-off-by: Tomas Kraus <tomas.kraus@oracle.com>
Reviewed-by: Lukas Jungmann <lukas.jungmann@oracle.com>
diff --git a/foundation/org.eclipse.persistence.core/src/org/eclipse/persistence/internal/sessions/ObjectChangeSet.java b/foundation/org.eclipse.persistence.core/src/org/eclipse/persistence/internal/sessions/ObjectChangeSet.java
index 1955141..217ef41 100644
--- a/foundation/org.eclipse.persistence.core/src/org/eclipse/persistence/internal/sessions/ObjectChangeSet.java
+++ b/foundation/org.eclipse.persistence.core/src/org/eclipse/persistence/internal/sessions/ObjectChangeSet.java
@@ -1118,7 +1118,6 @@
      * Helper method to readObject.  Completely write this ObjectChangeSet to the stream
      */
     public void writeCompleteChangeSet(java.io.ObjectOutputStream stream) throws java.io.IOException {
-        ensureChanges();
         writeIdentityInformation(stream);
         stream.writeObject(this.changes);
         stream.writeObject(this.oldKey);
@@ -1128,34 +1127,6 @@
 
     /**
      * INTERNAL:
-     * Ensure the change set is populated for cache coordination.
-     */
-    public void ensureChanges() {
-        if (this.isNew && ((this.changes == null) || this.changes.isEmpty() || cacheSynchronizationType != ClassDescriptor.SEND_NEW_OBJECTS_WITH_CHANGES)) {
-            AbstractSession unitOfWork = this.unitOfWorkChangeSet.getSession();
-            // Full change set is only required for cache coordination, not remote.
-            if (unitOfWork != null && !unitOfWork.isRemoteUnitOfWork()) {
-                ClassDescriptor descriptor = getDescriptor();
-                if (descriptor != null) {
-                    FetchGroup fetchGroup = null;
-                    if(descriptor.hasFetchGroupManager()) {
-                        fetchGroup = descriptor.getFetchGroupManager().getObjectFetchGroup(this.cloneObject);
-                    }
-                    List mappings = descriptor.getMappings();
-                    int mappingsSize = mappings.size();
-                    for (int index = 0; index < mappingsSize; index++) {
-                        DatabaseMapping mapping = (DatabaseMapping)mappings.get(index);
-                        if (fetchGroup == null || fetchGroup.containsAttributeInternal(mapping.getAttributeName())) {
-                            addChange(mapping.compareForChange(this.cloneObject, this.cloneObject, this, unitOfWork));
-                        }
-                    }
-                }
-            }
-        }
-    }
-
-    /**
-     * INTERNAL:
      * Reset the change set's transient variables after serialization.
      */
     public void postSerialize(Object clone, UnitOfWorkChangeSet uowChangeSet, AbstractSession session) {
diff --git a/foundation/org.eclipse.persistence.core/src/org/eclipse/persistence/internal/sessions/UnitOfWorkChangeSet.java b/foundation/org.eclipse.persistence.core/src/org/eclipse/persistence/internal/sessions/UnitOfWorkChangeSet.java
index 2cb0234..2432742 100644
--- a/foundation/org.eclipse.persistence.core/src/org/eclipse/persistence/internal/sessions/UnitOfWorkChangeSet.java
+++ b/foundation/org.eclipse.persistence.core/src/org/eclipse/persistence/internal/sessions/UnitOfWorkChangeSet.java
@@ -1,5 +1,5 @@
 /*******************************************************************************
- * Copyright (c) 1998, 2016 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 1998, 2018 Oracle and/or its affiliates. All rights reserved.
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 and Eclipse Distribution License v. 1.0
  * which accompanies this distribution.
@@ -17,12 +17,15 @@
 import java.util.HashSet;
 import java.util.IdentityHashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import org.eclipse.persistence.descriptors.ClassDescriptor;
 import org.eclipse.persistence.internal.identitymaps.CacheId;
 import org.eclipse.persistence.internal.identitymaps.CacheKey;
+import org.eclipse.persistence.mappings.DatabaseMapping;
+import org.eclipse.persistence.queries.FetchGroup;
 
 /**
  * <p>
@@ -311,6 +314,8 @@
      * INTERNAL:
      * Return a new UnitOfWorkChangeSet that only includes data require for the remote merge,
      * for cache coordination.
+     *
+     * @param session current database session
      */
     public UnitOfWorkChangeSet buildCacheCoordinationMergeChangeSet(AbstractSession session) {
         //bug 4416412: Map sent instead of Vector
@@ -334,8 +339,14 @@
             // Note: New objects could still be sent if the are referred to by a change record.
             if ((syncType != ClassDescriptor.DO_NOT_SEND_CHANGES)
                     && (!changeSet.isNew() || (syncType == ClassDescriptor.SEND_NEW_OBJECTS_WITH_CHANGES))) {
+                changeSet.unitOfWorkChangeSet.setSession(null);
                 writableChangeSets.put(changeSet, changeSet);
             }
+            // bug 530681: ensureChanges(AbstractSession, ObjectChangeSet, ClassDescriptor) from ObjectChangeSet was moved here
+            if (changeSet.isNew() && ((changeSet.changes == null) || changeSet.changes.isEmpty()
+                    || syncType != ClassDescriptor.SEND_NEW_OBJECTS_WITH_CHANGES)) {
+                ensureChanges(session, changeSet, descriptor);
+            }
         }
         Map sendableDeletedObjects = new IdentityHashMap();
         for (ObjectChangeSet changeSet : getDeletedObjects().keySet()) {
@@ -347,6 +358,7 @@
             // if they are meant to be merged into the distributed cache.
             // Note: New objects could still be sent if the are referred to by a change record.
             if (syncType != ClassDescriptor.DO_NOT_SEND_CHANGES) {
+                changeSet.unitOfWorkChangeSet.setSession(null);
                 sendableDeletedObjects.put(changeSet, changeSet);
             }
         }
@@ -366,6 +378,25 @@
     }
 
     /**
+     * Ensure the change set is populated for cache coordination.
+     *
+     * @param session current database session
+     * @param changeSet change set to populate
+     * @param descriptor class (relational) descriptor related to the change set
+     */
+    private void ensureChanges(final AbstractSession session, final ObjectChangeSet changeSet, final ClassDescriptor descriptor) {
+        FetchGroup fetchGroup = null;
+        if (descriptor.hasFetchGroupManager()) {
+            fetchGroup = descriptor.getFetchGroupManager().getObjectFetchGroup(changeSet.cloneObject);
+        }
+        for (DatabaseMapping mapping : descriptor.getMappings()) {
+            if (fetchGroup == null || fetchGroup.containsAttributeInternal(mapping.getAttributeName())) {
+                changeSet.addChange(mapping.compareForChange(changeSet.cloneObject, changeSet.cloneObject, changeSet, session));
+            }
+        }
+    }
+
+    /**
      * INTERNAL:
      * Get the clone to object change hash table.  Lazy initializes the map if required.
      */
diff --git a/jpa/eclipselink.jpa.test/src/org/eclipse/persistence/testing/tests/jpa/advanced/JPARCMLocalChangeSetTestSuite.java b/jpa/eclipselink.jpa.test/src/org/eclipse/persistence/testing/tests/jpa/advanced/JPARCMLocalChangeSetTestSuite.java
index 4e7b6f3..224ddbd 100644
--- a/jpa/eclipselink.jpa.test/src/org/eclipse/persistence/testing/tests/jpa/advanced/JPARCMLocalChangeSetTestSuite.java
+++ b/jpa/eclipselink.jpa.test/src/org/eclipse/persistence/testing/tests/jpa/advanced/JPARCMLocalChangeSetTestSuite.java
@@ -1,5 +1,5 @@
 /*******************************************************************************
- * Copyright (c) 1998, 2016 Oracle and/or its affiliates, IBM Corporation. All rights reserved.
+ * Copyright (c) 1998, 2018 Oracle and/or its affiliates, IBM Corporation. All rights reserved.
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 and Eclipse Distribution License v. 1.0
  * which accompanies this distribution.
@@ -13,8 +13,6 @@
 package org.eclipse.persistence.testing.tests.jpa.advanced;
 
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -22,12 +20,6 @@
 
 import javax.persistence.EntityManager;
 
-import junit.framework.Test;
-import junit.framework.TestSuite;
-
-import org.eclipse.persistence.sessions.server.ServerSession;
-import org.eclipse.persistence.config.CacheIsolationType;
-import org.eclipse.persistence.descriptors.CachePolicy;
 import org.eclipse.persistence.descriptors.ClassDescriptor;
 import org.eclipse.persistence.exceptions.CommunicationException;
 import org.eclipse.persistence.internal.helper.DatabaseField;
@@ -46,15 +38,18 @@
 import org.eclipse.persistence.sessions.coordination.ServiceId;
 import org.eclipse.persistence.sessions.coordination.TransportManager;
 import org.eclipse.persistence.sessions.serializers.JavaSerializer;
+import org.eclipse.persistence.sessions.server.ServerSession;
 import org.eclipse.persistence.testing.framework.junit.JUnitTestCase;
 import org.eclipse.persistence.testing.models.jpa.advanced.Address;
 import org.eclipse.persistence.testing.models.jpa.advanced.AdvancedTableCreator;
 import org.eclipse.persistence.testing.models.jpa.advanced.Employee;
-import org.eclipse.persistence.testing.models.jpa.advanced.EmployeePopulator;
 import org.eclipse.persistence.testing.models.jpa.cacheable.CacheableFalseEntity;
 import org.eclipse.persistence.testing.models.jpa.cacheable.CacheableForceProtectedEntity;
 import org.eclipse.persistence.testing.models.jpa.cacheable.CacheableTableCreator;
 
+import junit.framework.Test;
+import junit.framework.TestSuite;
+
 /**
  * JPARCMLocalChangeSetTestSuite
  * Simple low resource/setup JPA test suite & framework allowing for local 
@@ -469,6 +464,10 @@
             allowForChangePropagation();
             
             LocalConnection conn = getLocalConnection(session);
+            List<UnitOfWorkChangeSet> changeSets = conn.getReceivedChangeSets();
+            for (UnitOfWorkChangeSet changeSet : changeSets) {
+                assertNull(changeSet.getSession());
+            }
             assertEquals("Should have received one UnitOfWorkChangeSet", 1, conn.getReceivedChangeSets().size());
             UnitOfWorkChangeSet uowcs = conn.getReceivedChangeSets().get(0);