Обычная версия
Java форум JavaTalks
форум программистов

Поиск   Пользователи   Группы   Регистрация 
 Профиль   Личные сообщения 

 Вход 

nonBlocking ограничение длины ConcurrentHashMap.Предложения?
Список форумов
 ->  Нити и процессы


На страницу 1, 2  След. 
Начать новую тему 
Предыдущая тема :: Следующая тема  
Автор Сообщение
alexey : 213
Новичок
Откуда: Спб

СообщениеНоя 08, 2011 15:17 
Ответить с цитатой
Допустим у вас есть задача - держать мапу размером, не превышающим заданный. И чтобы threadSafe, и без блокировок.

мой вариант
Код:

ConcurrentHashMap<Key, Value> map = ....;


void put(Key key, Value value) {

map.put(key, value);

//теперь проверяем, не превысили ли мы пределы. Если превысили, то удаляем наиболее подходящий под условия элемент.

   if (map.size() > maximumSize) {
            Value toDelete;
            do {
                toDelete= map.values().iterator().next();
                for (Value value : map.values()) {
                    if (определенные условия) {
                        toDelete= value; 
                    }
                }  //выбрали, кого удалить
            } while (map.size() > maximumSize &
!map.remove(toDelete.key, toDelete));   //как видно, value содержит свой ключ
}


Код действительно предотвращает мапу от переполнения, но иногда он удаляет больше чем надо. Т.е. нам нужно чтобы, мапа была не длиннее 5 элементов, а иногда при добавлении шестого вышеуказанный код удаляет больше чем один.

Как показал дебаг, это связано с неточным map.size() в условии while(). Иногда он показывает число, больше чем maximumSize, хотя на деле элементов ровно maximumSize, из-за чего удаление и происходит, оставляя maximumSize-1 элемент.

Как мне от этого избавиться?
Как решить изначально поставленную задачу?
_________________
Wink
К началу Посмотреть профиль Отправить личное сообщение ICQ Number
dok : 71
Новичок

СообщениеНоя 08, 2011 15:53 
Ответить с цитатой
Пронаследоваться от ConcurrentHashMap, переопределить put и сделать там поле AtomicInteger mySize, по которому сразу удалять при put. Ну и учесть remove в счетчике. Как-то так.
К началу Посмотреть профиль Отправить личное сообщение
Taky_ : 491
Бывалый

СообщениеНоя 08, 2011 16:07 
Ответить с цитатой
Может, я ошибаюсь, но попробую сформулировать идею:

Предположение: определенные условия можно выразить в порядке отношения. Тогда порядок удаления можно хранить синхронизированной приоритетной очереди. Очередь хранит ключи.

Также есть мапа со значениями. Перед тем как в нее что-то положить или удалить, проходим через очередь. Которая фактически требует, чтобы мы согласовали количество, а после уже добавили, удалили из мапы нужные элементы, таким образом приведя в порядок

Проблема в том чтобы делать offer очереди и удалять элемент из очереди синхронизировано. Как это сделать без блокировок, не знаю...
Введение в неблокирующие алгоритмы: http://www.ibm.com/developerworks/ru/library/j-jtp04186/
А вообще попробую подумать ещеSmile
К началу Посмотреть профиль Отправить личное сообщение
alexey : 213
Новичок
Откуда: Спб

СообщениеНоя 08, 2011 17:54 
Ответить с цитатой
с очередью я пробовал, но там без блокировок ваще не получается (

dok, думается мне, что не получится. ибо обновление счетчика и добавление нужно будет синхронизировать, а я хочу избежать блокировки.

хотя что-то мне вообще думается, что не обойтись без блокировки (
_________________
Wink
К началу Посмотреть профиль Отправить личное сообщение ICQ Number
Taky_ : 491
Бывалый

СообщениеНоя 08, 2011 18:17 
Ответить с цитатой
alexey писал(а):
dok, думается мне, что не получится. ибо обновление счетчика и добавление нужно будет синхронизировать, а я хочу избежать блокировки.


Предлагается использовать AtomicInteger. При работе со счетчиком синхронизацию нужна не будет.
Но проблема скорее все равно останется. Ибо то поведение, которое описали вы связанно скорее всего не из-за "кривой" реализации счетчика, а из-за не тривиальной реализации ConcurrentHashMap, которое не блокирует работу...
К началу Посмотреть профиль Отправить личное сообщение
Vurn : 1122
Java Developer

СообщениеНоя 08, 2011 20:13 
Ответить с цитатой
ConcurrentSkipListMap. Унаследуйтесь, измените логику. Сделайте так, чтобы при создании super делал нужный Вам компаратор, с помощью которого сможете быстро определять, какой элемент удалять. В класс добавьте AtomicInteger для быстрого контроля size(), потому как его встроенный size() требует много времени для вычисления. Переопределите методы вставки и удаления, которые будут модифицировать AtomicInteger, и при необходимости, удалять самый ненужный элемент по компаратару.

Последний раз редактировалось: Vurn (Ноя 08, 2011 20:22), всего редактировалось 1 раз
К началу Посмотреть профиль Отправить личное сообщение
alexey : 213
Новичок
Откуда: Спб

СообщениеНоя 08, 2011 20:18 
Ответить с цитатой
Taky_ писал(а):

Предлагается использовать AtomicInteger. При работе со счетчиком синхронизацию нужна не будет.


дык последовательное выполнение двух атомарных операций (увеличение счетчика и super.put и уменьшение счетчика и super.remove) - не является атомарными, ибо между этими двумя атомарными вызовами есть окно, куда могут вставиться другие. Скажем два раза вызвали put из разных потоков - оба смогли увеличить счетчик, но добавить не успели.
Это даст возможное расхождение значений счетчика и реального количество элементов в мапе (отчего я и хочу избавиться). Чтобы этого избежать нужно эти две операции выполнять под одним локом.
_________________
Wink
К началу Посмотреть профиль Отправить личное сообщение ICQ Number
dok : 71
Новичок

СообщениеНоя 08, 2011 22:25 
Ответить с цитатой
alexey писал(а):

дык последовательное выполнение двух атомарных операций (увеличение счетчика и super.put и уменьшение счетчика и super.remove) - не является атомарными, ибо между этими двумя атомарными вызовами есть окно, куда могут вставиться другие. Скажем два раза вызвали put из разных потоков - оба смогли увеличить счетчик, но добавить не успели.
Это даст возможное расхождение значений счетчика и реального количество элементов в мапе (отчего я и хочу избавиться).


Вроде рассинхронизации быть не должно, если сначала менять счётчик и по нему уже смотреть - есть переполнение или нет. Поскольку каждый put добавляет ровно 1 элемент, то если переполнение есть, то тред, который делал put должен удалить одно значение.

Алгоритм правда всё равно может потереть больше значений чем надо , если в параллель кто-то делает прямой вызов к remove. Хотя возможно это и корректно, ведь есть наложение операций по времени и на момент принятия решения делать/не делать trim() - его нужно было делать.

Возможно есть ещё ошибки, которые я проглядел. Поправляйте.

Есть мысли, что как-то с 2мя переменными можно организовать более чётко, но пока не придумал.

Ещё вроде можно сделать с compareAndSwap, но там в случае переполнения фактически один тред будет удалять, а остальные ждать в spin lock'е. Хотя если очередь не заполнена, то не будет contention.

Вот код для первого варианта.

Код:

package test;

import java.util.NoSuchElementException;
import java.util.Random;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;

public class Test2
{
    public static final AtomicInteger put = new AtomicInteger();
    public static final AtomicInteger removed = new AtomicInteger();

    public static void main(String[] args) throws Exception
    {
        final long endTime = System.currentTimeMillis() + 5000;
        final Random random = new Random();
        final MyMap map = new MyMap();

        class TestThread extends Thread
        {
            @Override
            public void run()
            {
                while (System.currentTimeMillis() < endTime)
                {
                    map.put(random.nextLong(), "a");
                }
            }
        }

        TestThread t1 = new TestThread();
        TestThread t2 = new TestThread();

        t1.start();
        t2.start();

        t1.join();
        t2.join();

        System.out.println("Result: " + map.size() + " - " + map.mySize());
        System.out.println("put: " + put.get() + " Removed: " + removed.get());
    }

    static class MyMap extends ConcurrentSkipListMap
    {
        public static final int MAX_SIZE = 10;
        private AtomicInteger size = new AtomicInteger();

        @Override
        public Object put(Object key, Object value)
        {
            put.incrementAndGet();
            if (size.incrementAndGet() > MAX_SIZE)
            {
                removeSingleElement();
            }
            return super.put(key, value);
        }

        private void removeSingleElement()
        {
            while (true)
            {
                Object firstKey;
                try
                {
                    firstKey = firstKey();
                    if (remove(firstKey) != null)
                    {
                        removed.incrementAndGet();
                        return;
                    }
                }
                catch (NoSuchElementException e)
                {
                    //ok
                }
            }
        }

        public int mySize()
        {
            return size.get();
        }

        @Override
        public Object remove(Object key)
        {
            Object removed = super.remove(key);
            if (removed != null)
            {
                size.decrementAndGet();
            }
            return removed;
        }
    }
}
К началу Посмотреть профиль Отправить личное сообщение
alexey : 213
Новичок
Откуда: Спб

СообщениеНоя 09, 2011 1:33 
Ответить с цитатой
спасибо за код и интерес к вопросу )

но все же не работает ваш код, как хотелось бы.

немного изменил main, добавил потоков одновременных и вывожу содержимое мапы.
Код:

 public static void main(String[] args) throws Exception {
        while (true) {
            final Random random = new Random();
            final MyMap map = new MyMap();

            ExecutorService service = Executors.newFixedThreadPool(10);

            for (int i = 0; i < 100; i++) {
                service.submit(new Runnable() {
                    public void run() {
                        map.put(random.nextInt(100), "a");
                    }
                });
            }

           
            Thread.sleep(1000);  //подождем, пока потоки закончат пихать в мапу всякую гадость.

            System.out.println("Result: " + map.size() + " - " + map.mySize());
            System.out.println("map = " + map);           
        }

    }


вывод программы:

Код:

Result: 2 - 10
map = {4=a, 98=a}
Result: 3 - 10
map = {16=a, 22=a, 87=a}
Result: 2 - 10
map = {65=a, 68=a}
Result: 4 - 10
map = {68=a, 70=a, 96=a, 99=a}
Result: 7 - 10
map = {74=a, 77=a, 81=a, 93=a, 94=a, 98=a, 99=a}
Result: 5 - 10
map = {33=a, 42=a, 50=a, 98=a, 99=a}
Result: 5 - 10
map = {66=a, 67=a, 87=a, 93=a, 94=a}
Result: 5 - 10
map = {51=a, 63=a, 97=a, 98=a, 99=a}


а надо чтобы 10 было. (MAX_SIZE)

дырка вот здесь:
Object removed = super.remove(key);
if (removed != null) {
size.decrementAndGet();
}

поток добавляет 11-ый элемент и, допустим, уже успел сделать super.remove, но не успел задекриментить size, а другой поток уже пришел в put, увидел сайз 11 и давай удалять элемент, хотя их уже 10 по факту. Таким образом удалим 2 вместо одного.
_________________
Wink
К началу Посмотреть профиль Отправить личное сообщение ICQ Number
abch-98-ru : 38
Новичок

СообщениеНоя 09, 2011 15:06 
Ответить с цитатой
alexey писал(а):

dok, думается мне, что не получится.
но все же не работает ваш код, как хотелось бы.

<imho> идея doc нормальная и без remove() реализуется просто. </imho>
Код:

    static class MyMap<K, V> implements Map<K, V> {
        public static final int MAX_SIZE = 10;

        private AtomicInteger size = new AtomicInteger();

        private ConcurrentSkipListMap<K, V> forwardMap = new ConcurrentSkipListMap<>();

        @Override
        public int size() {
            return forwardMap.size();
        }

        @Override
        public V put(K key, V value) {
            while (true) {
                if (size.incrementAndGet() <= MAX_SIZE) {
                    V result = forwardMap.put(key, value);
                    if (result != null) {
                        size.decrementAndGet();
                    }
                    return result;
                }
                while (size.decrementAndGet() >= MAX_SIZE && forwardMap.pollFirstEntry() != null) {
                }

            }
        }

        public int mySize() {
            return size.get();
        }

        @Generated("by ide")
        @Override
        public String toString() {
            return "MyMap{" +
                "size=" + size + " map.size()=" + forwardMap.size() +
                ", forwardMap=" + forwardMap +
                '}';
        }
      /* the other methods*/
    }

ограничения такого вроде понятны: без remove(), size короткое время будет некорректен.

c remove():
<imho> надо делать wrapper c маркером deleted и пыхтеть на маркерами при remove-ах, чтобы size декрементировать, только когда надо. </imho>
К началу Посмотреть профиль Отправить личное сообщение
alexey : 213
Новичок
Откуда: Спб

СообщениеНоя 09, 2011 17:13 
Ответить с цитатой
спасибо за вариант, но не пройдет:

Код:

Result: 10 - 10
map = MyMap{size=10 map.size()=10, forwardMap={60=a, 79=a, 88=a, 91=a, 92=a, 93=a, 95=a, 96=a, 97=a, 98=a}}
Result: 10 - 10
map = MyMap{size=10 map.size()=10, forwardMap={44=a, 60=a, 82=a, 85=a, 86=a, 88=a, 89=a, 91=a, 96=a, 98=a}}
Result: 10 - 10
map = MyMap{size=10 map.size()=10, forwardMap={9=a, 40=a, 47=a, 72=a, 89=a, 90=a, 92=a, 93=a, 96=a, 97=a}}
Result: 10 - 10
map = MyMap{size=10 map.size()=10, forwardMap={7=a, 45=a, 88=a, 90=a, 91=a, 92=a, 94=a, 95=a, 96=a, 97=a}}
Result: 9 - 9
map = MyMap{size=9 map.size()=9, forwardMap={23=a, 69=a, 91=a, 92=a, 93=a, 95=a, 96=a, 97=a, 99=a}}
Result: 9 - 9
map = MyMap{size=9 map.size()=9, forwardMap={13=a, 41=a, 89=a, 93=a, 95=a, 96=a, 97=a, 98=a, 99=a}}
Result: 9 - 9
map = MyMap{size=9 map.size()=9, forwardMap={39=a, 54=a, 86=a, 87=a, 92=a, 93=a, 94=a, 98=a, 99=a}}


я согласен на то, чтобы в короткое время в процессе работы размер был не 10, но в конце работы он должен быть 10.

здесь проблема в

if (size.incrementAndGet() <= MAX_SIZE) {
V result = forwardMap.put(key, value);
if (result != null) {
size.decrementAndGet();

один поток уже заменил значение, но еще не успел уменьшить обратно счетчик, а другой поток пошел в первый if, увеличил счетчик, не прошел условие и пошел дальше. (дальше он во втором while будет уменьшать, одновременно с этим первый поток уменьшит число, там еще замуты всякие Smile )
_________________
Wink
К началу Посмотреть профиль Отправить личное сообщение ICQ Number
abch-98-ru : 38
Новичок

СообщениеНоя 09, 2011 17:51 
Ответить с цитатой
alexey писал(а):
спасибо за вариант, но не пройдет:
я согласен на то, чтобы в короткое время в процессе работы размер был не 10, но в конце работы он должен быть 10.

почему же это? Wink
alexey писал(а):

держать мапу размером, не превышающим заданный.
Как решить изначально поставленную задачу?

по заданию всё кошер. )
или изначальная задача уже изменилась? Smile
К началу Посмотреть профиль Отправить личное сообщение
alexey : 213
Новичок
Откуда: Спб

СообщениеНоя 09, 2011 18:22 
Ответить с цитатой
ну да, поймал =)

нужно, чтобы не больше, но и лишнего не удалял. Т.е. если я запихал в мапу 100 разных значений, а мапа максимального размера 10, то я ожидаю в конце увидеть 10, а не 8 или 9.
_________________
Wink
К началу Посмотреть профиль Отправить личное сообщение ICQ Number
abch-98-ru : 38
Новичок

СообщениеНоя 09, 2011 19:33 
Ответить с цитатой
alexey писал(а):
нужно, чтобы не больше, но и лишнего не удалял. Т.е. если я запихал в мапу 100 разных значений, а мапа максимального размера 10, то я ожидаю в конце увидеть 10, а не 8 или 9.

не, без критической секции или какого-нить stm, это никак имхо. Smile
на два поля атомарных инструкций не знаю Embarassed Smile
К началу Посмотреть профиль Отправить личное сообщение
dok : 71
Новичок

СообщениеНоя 09, 2011 21:52 
Ответить с цитатой
Чего-то голова уже совсем не варит. Набросал ещё вариант, но ниасиливаю корректный ли он или нет Smile Так что давайте проверять вместе:

Код:

package test;

import java.util.NoSuchElementException;
import java.util.Random;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;

public class Test2
{
    public static final AtomicInteger put = new AtomicInteger();
    public static final AtomicInteger removed = new AtomicInteger();

    public static void main(String[] args) throws Exception
    {
        final long endTime = System.currentTimeMillis() + 5000;
        final Random random = new Random();
        final MyMap map = new MyMap();

        class TestThread extends Thread
        {
            @Override
            public void run()
            {
                while (System.currentTimeMillis() < endTime)
                {
                    map.put(random.nextLong(), "a");
                }
            }
        }

        TestThread t1 = new TestThread();
        TestThread t2 = new TestThread();

        t1.start();
        t2.start();

        t1.join();
        t2.join();

        System.out.println("Result: " + map.size() + " - " + map.mySize());
        System.out.println("put: " + put.get() + " Removed: " + removed.get());
    }

    static class MyMap extends ConcurrentSkipListMap
    {
        public static final int MAX_SIZE = 10;
        private AtomicInteger size = new AtomicInteger();

        @Override
        public Object put(Object key, Object value)
        {
            put.incrementAndGet();
            //we change size to the value it would be on if current operation would have been atomic
            while (true)
            {
                int currentSize = size.get();
                if (currentSize < MAX_SIZE)
                {
                    if (size.compareAndSet(currentSize, currentSize+1))
                    {
                        //alright - we can put value and not exceed the size
                        return super.put(key, value);
                    }
                }
                else if (currentSize == MAX_SIZE)
                {
                    //assume we do -1 and then +1 element so we don't change size during this operation
                    removeSingleElement();
                    return super.put(key, value);
                }
                else
                {
                    //never happens
                }
            }
        }

        private void removeSingleElement()
        {
            while (true)
            {
                Object firstKey;
                try
                {
                    firstKey = firstKey();
                    //do not decrement size counter here - simple trim
                    if (super.remove(firstKey) != null)
                    {
                        //todo handle case when map is already empty
                        removed.incrementAndGet();
                        return;
                    }
                }
                catch (NoSuchElementException e)
                {
                    //ok
                }
            }
        }

        public int mySize()
        {
            return size.get();
        }

        @Override
        public Object remove(Object key)
        {
            Object removed = super.remove(key);
            if (removed != null)
            {
                size.decrementAndGet();
            }
            return removed;
        }
    }
}
К началу Посмотреть профиль Отправить личное сообщение
 
Начать новую тему  Ответить на тему
Страница 1 из 2
На страницу 1, 2  След.
Список форумов
 -> Нити и процессы


 
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах


Java and all Java-related trademarks and logos are trademarks or registered trademarks of Oracle Corporation in the United States and other countries.
Это сайт не относится к фирме Oracle Corporation и не поддерживается ею.

© 2006-2010 www.javatalks.ru: форум java программистов
Используется скрипт phpBB © 2001, 2010 phpBB Group

Хостинг от bizname.ru