|
Java форум JavaTalks форум программистов
|
|
|
|
| Предыдущая тема :: Следующая тема |
| Автор |
Сообщение |
alexey : 213 Новичок Откуда: Спб
|
Ноя 08, 2011 15:17 |
|
|
Допустим у вас есть задача - держать мапу размером, не превышающим заданный. И чтобы threadSafe, и без блокировок.
мой вариант
| Код: |
ConcurrentHashMap<Key, Value> map = ....;
void put(Key key, Value value) {
map.put(key, value);
//теперь проверяем, не превысили ли мы пределы. Если превысили, то удаляем наиболее подходящий под условия элемент.
if (map.size() > maximumSize) {
Value toDelete;
do {
toDelete= map.values().iterator().next();
for (Value value : map.values()) {
if (определенные условия) {
toDelete= value;
}
} //выбрали, кого удалить
} while (map.size() > maximumSize &
!map.remove(toDelete.key, toDelete)); //как видно, value содержит свой ключ
}
|
Код действительно предотвращает мапу от переполнения, но иногда он удаляет больше чем надо. Т.е. нам нужно чтобы, мапа была не длиннее 5 элементов, а иногда при добавлении шестого вышеуказанный код удаляет больше чем один.
Как показал дебаг, это связано с неточным map.size() в условии while(). Иногда он показывает число, больше чем maximumSize, хотя на деле элементов ровно maximumSize, из-за чего удаление и происходит, оставляя maximumSize-1 элемент.
Как мне от этого избавиться?
Как решить изначально поставленную задачу? _________________
 |
|
|
|
 |
dok : 71 Новичок
|
Ноя 08, 2011 15:53 |
|
|
| Пронаследоваться от ConcurrentHashMap, переопределить put и сделать там поле AtomicInteger mySize, по которому сразу удалять при put. Ну и учесть remove в счетчике. Как-то так. |
|
|
|
 |
Taky_ : 491 Бывалый
|
Ноя 08, 2011 16:07 |
|
|
Может, я ошибаюсь, но попробую сформулировать идею:
Предположение: определенные условия можно выразить в порядке отношения. Тогда порядок удаления можно хранить синхронизированной приоритетной очереди. Очередь хранит ключи.
Также есть мапа со значениями. Перед тем как в нее что-то положить или удалить, проходим через очередь. Которая фактически требует, чтобы мы согласовали количество, а после уже добавили, удалили из мапы нужные элементы, таким образом приведя в порядок
Проблема в том чтобы делать offer очереди и удалять элемент из очереди синхронизировано. Как это сделать без блокировок, не знаю...
Введение в неблокирующие алгоритмы: http://www.ibm.com/developerworks/ru/library/j-jtp04186/
А вообще попробую подумать еще |
|
|
|
 |
alexey : 213 Новичок Откуда: Спб
|
Ноя 08, 2011 17:54 |
|
|
с очередью я пробовал, но там без блокировок ваще не получается (
dok, думается мне, что не получится. ибо обновление счетчика и добавление нужно будет синхронизировать, а я хочу избежать блокировки.
хотя что-то мне вообще думается, что не обойтись без блокировки ( _________________
 |
|
|
|
 |
Taky_ : 491 Бывалый
|
Ноя 08, 2011 18:17 |
|
|
| alexey писал(а): |
dok, думается мне, что не получится. ибо обновление счетчика и добавление нужно будет синхронизировать, а я хочу избежать блокировки.
|
Предлагается использовать AtomicInteger. При работе со счетчиком синхронизацию нужна не будет.
Но проблема скорее все равно останется. Ибо то поведение, которое описали вы связанно скорее всего не из-за "кривой" реализации счетчика, а из-за не тривиальной реализации ConcurrentHashMap, которое не блокирует работу... |
|
|
|
 |
Vurn : 1122 Java Developer
|
Ноя 08, 2011 20:13 |
|
|
ConcurrentSkipListMap. Унаследуйтесь, измените логику. Сделайте так, чтобы при создании super делал нужный Вам компаратор, с помощью которого сможете быстро определять, какой элемент удалять. В класс добавьте AtomicInteger для быстрого контроля size(), потому как его встроенный size() требует много времени для вычисления. Переопределите методы вставки и удаления, которые будут модифицировать AtomicInteger, и при необходимости, удалять самый ненужный элемент по компаратару.
Последний раз редактировалось: Vurn (Ноя 08, 2011 20:22), всего редактировалось 1 раз |
|
|
|
 |
alexey : 213 Новичок Откуда: Спб
|
Ноя 08, 2011 20:18 |
|
|
| Taky_ писал(а): |
Предлагается использовать AtomicInteger. При работе со счетчиком синхронизацию нужна не будет. |
дык последовательное выполнение двух атомарных операций (увеличение счетчика и super.put и уменьшение счетчика и super.remove) - не является атомарными, ибо между этими двумя атомарными вызовами есть окно, куда могут вставиться другие. Скажем два раза вызвали put из разных потоков - оба смогли увеличить счетчик, но добавить не успели.
Это даст возможное расхождение значений счетчика и реального количество элементов в мапе (отчего я и хочу избавиться). Чтобы этого избежать нужно эти две операции выполнять под одним локом. _________________
 |
|
|
|
 |
dok : 71 Новичок
|
Ноя 08, 2011 22:25 |
|
|
| alexey писал(а): |
дык последовательное выполнение двух атомарных операций (увеличение счетчика и super.put и уменьшение счетчика и super.remove) - не является атомарными, ибо между этими двумя атомарными вызовами есть окно, куда могут вставиться другие. Скажем два раза вызвали put из разных потоков - оба смогли увеличить счетчик, но добавить не успели.
Это даст возможное расхождение значений счетчика и реального количество элементов в мапе (отчего я и хочу избавиться). |
Вроде рассинхронизации быть не должно, если сначала менять счётчик и по нему уже смотреть - есть переполнение или нет. Поскольку каждый put добавляет ровно 1 элемент, то если переполнение есть, то тред, который делал put должен удалить одно значение.
Алгоритм правда всё равно может потереть больше значений чем надо , если в параллель кто-то делает прямой вызов к remove. Хотя возможно это и корректно, ведь есть наложение операций по времени и на момент принятия решения делать/не делать trim() - его нужно было делать.
Возможно есть ещё ошибки, которые я проглядел. Поправляйте.
Есть мысли, что как-то с 2мя переменными можно организовать более чётко, но пока не придумал.
Ещё вроде можно сделать с compareAndSwap, но там в случае переполнения фактически один тред будет удалять, а остальные ждать в spin lock'е. Хотя если очередь не заполнена, то не будет contention.
Вот код для первого варианта.
| Код: |
package test;
import java.util.NoSuchElementException;
import java.util.Random;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
public class Test2
{
public static final AtomicInteger put = new AtomicInteger();
public static final AtomicInteger removed = new AtomicInteger();
public static void main(String[] args) throws Exception
{
final long endTime = System.currentTimeMillis() + 5000;
final Random random = new Random();
final MyMap map = new MyMap();
class TestThread extends Thread
{
@Override
public void run()
{
while (System.currentTimeMillis() < endTime)
{
map.put(random.nextLong(), "a");
}
}
}
TestThread t1 = new TestThread();
TestThread t2 = new TestThread();
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("Result: " + map.size() + " - " + map.mySize());
System.out.println("put: " + put.get() + " Removed: " + removed.get());
}
static class MyMap extends ConcurrentSkipListMap
{
public static final int MAX_SIZE = 10;
private AtomicInteger size = new AtomicInteger();
@Override
public Object put(Object key, Object value)
{
put.incrementAndGet();
if (size.incrementAndGet() > MAX_SIZE)
{
removeSingleElement();
}
return super.put(key, value);
}
private void removeSingleElement()
{
while (true)
{
Object firstKey;
try
{
firstKey = firstKey();
if (remove(firstKey) != null)
{
removed.incrementAndGet();
return;
}
}
catch (NoSuchElementException e)
{
//ok
}
}
}
public int mySize()
{
return size.get();
}
@Override
public Object remove(Object key)
{
Object removed = super.remove(key);
if (removed != null)
{
size.decrementAndGet();
}
return removed;
}
}
}
|
|
|
|
|
 |
alexey : 213 Новичок Откуда: Спб
|
Ноя 09, 2011 1:33 |
|
|
спасибо за код и интерес к вопросу )
но все же не работает ваш код, как хотелось бы.
немного изменил main, добавил потоков одновременных и вывожу содержимое мапы.
| Код: |
public static void main(String[] args) throws Exception {
while (true) {
final Random random = new Random();
final MyMap map = new MyMap();
ExecutorService service = Executors.newFixedThreadPool(10);
for (int i = 0; i < 100; i++) {
service.submit(new Runnable() {
public void run() {
map.put(random.nextInt(100), "a");
}
});
}
Thread.sleep(1000); //подождем, пока потоки закончат пихать в мапу всякую гадость.
System.out.println("Result: " + map.size() + " - " + map.mySize());
System.out.println("map = " + map);
}
}
|
вывод программы:
| Код: |
Result: 2 - 10
map = {4=a, 98=a}
Result: 3 - 10
map = {16=a, 22=a, 87=a}
Result: 2 - 10
map = {65=a, 68=a}
Result: 4 - 10
map = {68=a, 70=a, 96=a, 99=a}
Result: 7 - 10
map = {74=a, 77=a, 81=a, 93=a, 94=a, 98=a, 99=a}
Result: 5 - 10
map = {33=a, 42=a, 50=a, 98=a, 99=a}
Result: 5 - 10
map = {66=a, 67=a, 87=a, 93=a, 94=a}
Result: 5 - 10
map = {51=a, 63=a, 97=a, 98=a, 99=a}
|
а надо чтобы 10 было. (MAX_SIZE)
дырка вот здесь:
Object removed = super.remove(key);
if (removed != null) {
size.decrementAndGet();
}
поток добавляет 11-ый элемент и, допустим, уже успел сделать super.remove, но не успел задекриментить size, а другой поток уже пришел в put, увидел сайз 11 и давай удалять элемент, хотя их уже 10 по факту. Таким образом удалим 2 вместо одного. _________________
 |
|
|
|
 |
abch-98-ru : 38 Новичок
|
Ноя 09, 2011 15:06 |
|
|
| alexey писал(а): |
dok, думается мне, что не получится.
но все же не работает ваш код, как хотелось бы.
|
<imho> идея doc нормальная и без remove() реализуется просто. </imho>
| Код: |
static class MyMap<K, V> implements Map<K, V> {
public static final int MAX_SIZE = 10;
private AtomicInteger size = new AtomicInteger();
private ConcurrentSkipListMap<K, V> forwardMap = new ConcurrentSkipListMap<>();
@Override
public int size() {
return forwardMap.size();
}
@Override
public V put(K key, V value) {
while (true) {
if (size.incrementAndGet() <= MAX_SIZE) {
V result = forwardMap.put(key, value);
if (result != null) {
size.decrementAndGet();
}
return result;
}
while (size.decrementAndGet() >= MAX_SIZE && forwardMap.pollFirstEntry() != null) {
}
}
}
public int mySize() {
return size.get();
}
@Generated("by ide")
@Override
public String toString() {
return "MyMap{" +
"size=" + size + " map.size()=" + forwardMap.size() +
", forwardMap=" + forwardMap +
'}';
}
/* the other methods*/
}
|
ограничения такого вроде понятны: без remove(), size короткое время будет некорректен.
c remove():
<imho> надо делать wrapper c маркером deleted и пыхтеть на маркерами при remove-ах, чтобы size декрементировать, только когда надо. </imho> |
|
|
|
 |
alexey : 213 Новичок Откуда: Спб
|
Ноя 09, 2011 17:13 |
|
|
спасибо за вариант, но не пройдет:
| Код: |
Result: 10 - 10
map = MyMap{size=10 map.size()=10, forwardMap={60=a, 79=a, 88=a, 91=a, 92=a, 93=a, 95=a, 96=a, 97=a, 98=a}}
Result: 10 - 10
map = MyMap{size=10 map.size()=10, forwardMap={44=a, 60=a, 82=a, 85=a, 86=a, 88=a, 89=a, 91=a, 96=a, 98=a}}
Result: 10 - 10
map = MyMap{size=10 map.size()=10, forwardMap={9=a, 40=a, 47=a, 72=a, 89=a, 90=a, 92=a, 93=a, 96=a, 97=a}}
Result: 10 - 10
map = MyMap{size=10 map.size()=10, forwardMap={7=a, 45=a, 88=a, 90=a, 91=a, 92=a, 94=a, 95=a, 96=a, 97=a}}
Result: 9 - 9
map = MyMap{size=9 map.size()=9, forwardMap={23=a, 69=a, 91=a, 92=a, 93=a, 95=a, 96=a, 97=a, 99=a}}
Result: 9 - 9
map = MyMap{size=9 map.size()=9, forwardMap={13=a, 41=a, 89=a, 93=a, 95=a, 96=a, 97=a, 98=a, 99=a}}
Result: 9 - 9
map = MyMap{size=9 map.size()=9, forwardMap={39=a, 54=a, 86=a, 87=a, 92=a, 93=a, 94=a, 98=a, 99=a}}
|
я согласен на то, чтобы в короткое время в процессе работы размер был не 10, но в конце работы он должен быть 10.
здесь проблема в
if (size.incrementAndGet() <= MAX_SIZE) {
V result = forwardMap.put(key, value);
if (result != null) {
size.decrementAndGet();
один поток уже заменил значение, но еще не успел уменьшить обратно счетчик, а другой поток пошел в первый if, увеличил счетчик, не прошел условие и пошел дальше. (дальше он во втором while будет уменьшать, одновременно с этим первый поток уменьшит число, там еще замуты всякие ) _________________
 |
|
|
|
 |
abch-98-ru : 38 Новичок
|
Ноя 09, 2011 17:51 |
|
|
| alexey писал(а): |
спасибо за вариант, но не пройдет:
я согласен на то, чтобы в короткое время в процессе работы размер был не 10, но в конце работы он должен быть 10.
|
почему же это?
| alexey писал(а): |
держать мапу размером, не превышающим заданный.
Как решить изначально поставленную задачу?
|
по заданию всё кошер. )
или изначальная задача уже изменилась?  |
|
|
|
 |
alexey : 213 Новичок Откуда: Спб
|
Ноя 09, 2011 18:22 |
|
|
ну да, поймал =)
нужно, чтобы не больше, но и лишнего не удалял. Т.е. если я запихал в мапу 100 разных значений, а мапа максимального размера 10, то я ожидаю в конце увидеть 10, а не 8 или 9. _________________
 |
|
|
|
 |
abch-98-ru : 38 Новичок
|
Ноя 09, 2011 19:33 |
|
|
| alexey писал(а): |
| нужно, чтобы не больше, но и лишнего не удалял. Т.е. если я запихал в мапу 100 разных значений, а мапа максимального размера 10, то я ожидаю в конце увидеть 10, а не 8 или 9. |
не, без критической секции или какого-нить stm, это никак имхо.
на два поля атомарных инструкций не знаю  |
|
|
|
 |
dok : 71 Новичок
|
Ноя 09, 2011 21:52 |
|
|
Чего-то голова уже совсем не варит. Набросал ещё вариант, но ниасиливаю корректный ли он или нет Так что давайте проверять вместе:
| Код: |
package test;
import java.util.NoSuchElementException;
import java.util.Random;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
public class Test2
{
public static final AtomicInteger put = new AtomicInteger();
public static final AtomicInteger removed = new AtomicInteger();
public static void main(String[] args) throws Exception
{
final long endTime = System.currentTimeMillis() + 5000;
final Random random = new Random();
final MyMap map = new MyMap();
class TestThread extends Thread
{
@Override
public void run()
{
while (System.currentTimeMillis() < endTime)
{
map.put(random.nextLong(), "a");
}
}
}
TestThread t1 = new TestThread();
TestThread t2 = new TestThread();
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("Result: " + map.size() + " - " + map.mySize());
System.out.println("put: " + put.get() + " Removed: " + removed.get());
}
static class MyMap extends ConcurrentSkipListMap
{
public static final int MAX_SIZE = 10;
private AtomicInteger size = new AtomicInteger();
@Override
public Object put(Object key, Object value)
{
put.incrementAndGet();
//we change size to the value it would be on if current operation would have been atomic
while (true)
{
int currentSize = size.get();
if (currentSize < MAX_SIZE)
{
if (size.compareAndSet(currentSize, currentSize+1))
{
//alright - we can put value and not exceed the size
return super.put(key, value);
}
}
else if (currentSize == MAX_SIZE)
{
//assume we do -1 and then +1 element so we don't change size during this operation
removeSingleElement();
return super.put(key, value);
}
else
{
//never happens
}
}
}
private void removeSingleElement()
{
while (true)
{
Object firstKey;
try
{
firstKey = firstKey();
//do not decrement size counter here - simple trim
if (super.remove(firstKey) != null)
{
//todo handle case when map is already empty
removed.incrementAndGet();
return;
}
}
catch (NoSuchElementException e)
{
//ok
}
}
}
public int mySize()
{
return size.get();
}
@Override
public Object remove(Object key)
{
Object removed = super.remove(key);
if (removed != null)
{
size.decrementAndGet();
}
return removed;
}
}
}
|
|
|
|
|
 |
|
|
Страница 1 из 2 На страницу 1, 2 След. |
Список форумов
-> Нити и процессы |
|
Вы не можете начинать темы Вы не можете отвечать на сообщения Вы не можете редактировать свои сообщения Вы не можете удалять свои сообщения Вы не можете голосовать в опросах
|
|